2

Recently I set up a Flask POST endpoint to write data into Impala DB via the Impyla module.

Env: Python 3.6.5 on CentOS.

Impala version: impalad version 2.6.0-cdh5.8.0

api.py:

from flask import Flask, request, abort, Response
from flask_cors import CORS
import json
from impala.dbapi import connect
import sys
import re
from datetime import datetime


app = application = Flask(__name__)
CORS(app)


conn = connect(host='datanode2', port=21050,
            user='user', database='testdb')


@app.route("/api/endpoint", methods=['POST'])
def post_data():
    # if not request.json:
    #     abort(400)

    params = request.get_json(force=True)  # getting request data
    print(">>>>>> ", params, flush=True)

    params['log_time'] = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
    # params['page_url'] = re.sub(
    #     '[^a-zA-Z0-9-_*.]', '', re.sub(':', '_', params['page_url']))

    try:
        cursor = conn.cursor()

        sql = "INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES (%s, %s, %s, %s, %s, %s)"
        values = (params['page_title'], params['page_url'], params['log_time'],
                params['machine'], params['clicks'], params['id'])
        print(">>>>>> " + sql % values, file=sys.stderr, flush=True)

        cursor.execute(sql, values)

        print(
            f">>>>>> Data Written Successfully", file=sys.stderr, flush=True)
        return Response(json.dumps({'success': True}), 201, mimetype="application/json")
    except Exception as e:
        print(e, file=sys.stderr, flush=True)
        return Response(json.dumps({'success': False}), 400, mimetype="application/json")


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5008, debug=True)

req.py:

import requests as r

url = "http://123.234.345.456:30001/"
# url =  "https://stackoverflow.com/questions/ask"

res = r.post('http://localhost:5008/api/endpoint', 
            json={             
                "page_title": "Home",   
                "page_url": url,
                "machine": "Mac OS",
                "clicks": 16,
                "id": "60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db"         
            }
        )

if res.ok:
    print(res.json())
else:
    print('Error!')

I ran the flask api with python api.py then test it with python req.py.

The flask server gives this error:

>>>>>>  {'page_title': 'Home', 'page_url': 'http://123.234.345.456:30001/', 'machine': 'Mac OS', 'clicks': 16, 'id': '60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db'}
>>>>>> INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES (Home, http://123.234.345.456:30001/, 2018-12-12 16-14-04, Mac OS, 16, 60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db)
AnalysisException: Syntax error in line 1:
..., 'http://123.234.345.456'2018-12-12 16-14-04'0001/', ...
                         ^
Encountered: INTEGER LITERAL
Expected: AND, AS, ASC, BETWEEN, CROSS, DESC, DIV, ELSE, END, FOLLOWING, FROM, FULL, GROUP, HAVING, ILIKE, IN, INNER, IREGEXP, IS, JOIN, LEFT, LIKE, LIMIT, NOT, NULLS, OFFSET, OR, ORDER, PRECEDING, RANGE, REGEXP, RIGHT, RLIKE, ROWS, THEN, UNION, WHEN, WHERE, COMMA, IDENTIFIER

CAUSED BY: Exception: Syntax error

This Error is kind of annoying:

  1. I tried directly inserting sql command inside impala-shell and it works.

  2. When the page_url is the only parameter, it works fine, too.

So it is some kinds of conditional character escaping issue? I managed to bypass this issue by tweaking the url with some regular expression (Uncomment Line 27 - 28). But this is really annoying, I don't want to clean my data because of this.

When I check other people's trials, it is thought that adding a pair of quotes to each inserting values will work. However, how can I do this when using string formatting, and it has to take place before cursor.execute(sql, values)?

davidism
  • 121,510
  • 29
  • 395
  • 339
suvtfopw
  • 928
  • 10
  • 18

2 Answers2

3

After some struggling, and great help from @Scratch'N'Purr and @msafiullah at Parameter substitution issue #317, I managed to make it work. This is kind of complicated so I will post the full code for documentation:

Reason of error: colon escaping issue via the Impyla API.

Solution: Use customised escaping function to process data and adopt sql injection (Python's string formatting way to substitute parameters) instead of the standard Python DB API e.g. cursor.execute(sql, values).

api.py:

from flask import Flask, request, abort, Response
from flask_cors import CORS
import json
from impala.dbapi import connect
from impala.util import _escape
import sys    
from datetime import datetime
import six

app = application = Flask(__name__)
CORS(app)


conn = connect(host='datanode2', port=21050,
            user='user', database='testdb')


def parameterize(value): # by msafiullah
    if value is None:
        return "NULL"
    elif isinstance(value, six.string_types):
        return "'" + _escape(value) + "'"
    else:
        return str(value)


@app.route("/api/endpoint", methods=['POST'])
def post_data():
    if not request.json:
        abort(400)

    params = request.get_json(force=True)  # getting request data
    print(">>>>>> ", params, flush=True)

    params['log_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    try:
        cursor = conn.cursor()

        sql = 'INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ( CAST({} AS VARCHAR(64)), {}, {}, CAST({} AS VARCHAR(32)) , {}, CAST({} AS VARCHAR(32)))'\
                .format(parameterize(params['page_title']), parameterize(params['page_url']), parameterize(params['log_time']), parameterize(params['machine']), params['clicks'], parameterize(params['id']))
        print(">>>>>> " + sql, file=sys.stderr, flush=True)

        cursor.execute(sql)

        print(
            f">>>>>> Data Written Successfully", file=sys.stderr, flush=True)
        return Response(json.dumps({'success': True}), 201, mimetype="application/json")
    except Exception as e:
        print(e, file=sys.stderr, flush=True)
        return Response(json.dumps({'success': False}), 400, mimetype="application/json")


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5008, debug=True)

req.py is same as Question.

table schema:

CREATE TABLE if not exists table (
    id VARCHAR(36),
    machine VARCHAR(32),
    clicks INT,
    page_title VARCHAR(64),
    page_url STRING,
    log_time TIMESTAMP
);

Flask's server output:

>>>>>>  {'page_title': 'Home', 'page_url': 'http://123.234.345.456:30001/', 'machine': 'Mac OS', 'clicks': 16, 'id': '60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db'}
>>>>>> INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ( CAST('Home' AS VARCHAR(64)), 'http://123.234.345.456:30001/', '2018-12-14 17:27:29', CAST('Mac OS' AS VARCHAR(32)) , 16, CAST('60cd1d79-eda7-44c2-a4ec-ffdd5d6ac3db' AS VARCHAR(32)))
>>>>>> Data Written Successfully
127.0.0.1 - - [14/Dec/2018 17:27:29] "POST /api/endpoint HTTP/1.1" 201 -

Inside Impala-shell, select * from table will give:

+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+
| id                               | machine | clicks      | page_title | page_url                                                             | log_time            |
+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+
| 60cd1d79-eda7-44c2-a4ec-ffdd5d6a | Mac OS | 16           | Home       | http://123.234.345.456:30001/                                        | 2018-12-14 17:27:29 |
+----------------------------------+--------+--------------+------------+----------------------------------------------------------------------+---------------------+

Basically, only numbers (e.g. INT type) do not need to go through the parameterize() cleaning/escape process. Other types such as VARCHAR, CHAR, STRING, TIMESTAMP (because of the colons) shall be escaped proeprly to safely insert through the Impyla API.

suvtfopw
  • 928
  • 10
  • 18
1

Impyla or other impala based python libraries don't support parameterized queries, the way that traditional SQL dbs do. The only solution I have come across was to wrap the insert values with quotes if the values are defined as string/timestamp.

You mention how to do this when using string formatting before executing the query? Simple, just apply the string formatting and then insert the formatted value.

In your example, let's assume your table had the following type definitions:

CREATE TABLE table (
    page_title VARCHAR(64),
    page_url STRING,
    log_time TIMESTAMP,
    machine VARCHAR(64),
    clicks INT,
    id CHAR(36)
)

Then your insert statement would be:

sql = "INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ('%s', '%s', '%s', '%s', %s, '%s')"  # note the single quotes around the string/timestamp types

Now since log_time is a timestamp type, you'll have to format your datetime.now() to the yyyy-MM-dd HH:mm:ss format.

params['log_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

If you had defined log_time as STRING instead of TIMESTAMP, then your format of %Y-%m-%d %H-%M-%S would work.

Finally, execute:

values = (params['page_title'], params['page_url'], params['log_time'],
          params['machine'], params['clicks'], params['id'])
cursor.execute(sql, values)

Note that this method only works when you're working with basic data types such as numerics or strings. Anything complex such as arrays or structs won't work.

Scratch'N'Purr
  • 9,959
  • 2
  • 35
  • 51
  • 1
    But Impyla explicitly *does* support parameterised queries, see https://github.com/cloudera/impyla/blob/master/impala/tests/test_query_parameters.py – Daniel Roseman Dec 12 '18 at 09:33
  • @DanielRoseman Hmm, this must have been new since the last time I did use impyla. Then in that case, there's no need to do any upstream string formatting or wrapping values in quotes. I wonder if it still applies to parameterized inserts. – Scratch'N'Purr Dec 12 '18 at 09:46
  • @Scratch'N'Purr Thanks so much for the solution! I actually had to use STRING for my `log_time` because of escaping issue as well...! Now I am trying to alter my table back on track – suvtfopw Dec 14 '18 at 04:00
  • @DanielRoseman Interesting, didn't notice that as well. It would be really nice if this works. Code just becomes cleaner and readable. – suvtfopw Dec 14 '18 at 04:16
  • I found that we cannot use single quotes in insert statement, it will make a crash with the default quotes: `AnalysisException: Syntax error in line 1: ...e' AS VARCHAR(64)), ''http://123.234.345.456'2018-12-1...` – suvtfopw Dec 14 '18 at 07:26
  • Now an interesting thing comes: it inserts successfully but the `page_url` field shows the `log_time` instead of the input url....... – suvtfopw Dec 14 '18 at 07:44
  • 1
    Hmmm it is indeed interesting. It seems like the parameterized insert is treating the `:3` of the url as a placeholder (`'http://123.234.345.456'2018-12-12 16-14-04'0001/'`). Perhaps the only other option is use sql injection (e.g. `sql = "INSERT INTO table ( page_title, page_url, log_time, machine, clicks, id ) VALUES ('{}', '{}', '{}', '{}', {}, '{}')".format(params['page_title'], params['page_url'], ...)` – Scratch'N'Purr Dec 14 '18 at 07:54
  • On Github, this person suggests the workaround to use parameterised queries. [Parameter substitution issue #317](https://github.com/cloudera/impyla/issues/317) – suvtfopw Dec 14 '18 at 08:10
  • @jsnceo Thanks for sharing! – Scratch'N'Purr Dec 14 '18 at 08:15
  • I still can't get them into the right fields. The colon issue is problematic. – suvtfopw Dec 14 '18 at 08:42
  • Oops, I just suddenly got it right. Will post my solution as a new answer! – suvtfopw Dec 14 '18 at 09:29