liteshort.py 9.8 KB
Newer Older
132ikl's avatar
132ikl committed
1
from flask import Flask, current_app, flash, g, jsonify, redirect, render_template, request, url_for
2
3
4
5
import bcrypt
import random
import sqlite3
import time
132ikl's avatar
132ikl committed
6
import urllib
7
8
import yaml

132ikl's avatar
132ikl committed
9
10
app = Flask(__name__)

11
12

def load_config():
13
14
15
16
17
    new_config = yaml.load(open('config.yml'))
    new_config = {k.lower(): v for k, v in new_config.items()}  # Make config keys case insensitive

    req_options = {'admin_username': 'admin', 'database_name': "urls", 'random_length': 4,
                   'allowed_chars': 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_',
132ikl's avatar
132ikl committed
18
                   'random_gen_timeout': 5, 'site_name': 'liteshort', 'site_url': None
19
20
21
                   }

    config_types = {'admin_username': str, 'database_name': str, 'random_length': int,
132ikl's avatar
132ikl committed
22
23
                    'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str,
                    'site_url': (str, type(None))}
24
25
26
27
28
29
30

    for option in req_options.keys():
        if option not in new_config.keys():  # Make sure everything in req_options is set in config
            new_config[option] = req_options[option]

    for option in new_config.keys():
        if option in config_types:
132ikl's avatar
132ikl committed
31
32
33
34
35
36
37
38
            matches = False
            if type(config_types[option]) is not tuple:
                config_types[option] = (config_types[option],)  # Automatically creates tuple for non-tuple types
            for req_type in config_types[option]:  # Iterates through tuple to allow multiple types for config options
                if type(new_config[option]) is req_type:
                    matches = True
            if not matches:
                raise TypeError(option + " is incorrect type")
39
40
41
42
43

    if 'admin_hashed_password' in new_config.keys():  # Sets config value to see if bcrypt is required to check password
        new_config['password_hashed'] = True
    elif 'admin_password' in new_config.keys():
        new_config['password_hashed'] = False
44
    else:
45
        raise TypeError('admin_password or admin_hashed_password must be set in config.yml')
46
47
48
    return new_config


132ikl's avatar
132ikl committed
49
50
51
52
53
54
55
56
57
58
59
60
def authenticate(username, password):
    return username == current_app.config['admin_username'] and check_password(password, current_app.config)


def check_long_exist(long):
    query = query_db('SELECT short FROM urls WHERE long = ?', (long,))
    for i in query:
        if i and (len(i['short']) <= current_app.config["random_length"]):  # Checks if query if pre-existing URL is same as random length URL
            return i['short']
    return False


132ikl's avatar
132ikl committed
61
62
def check_short_exist(short):  # Allow to also check against a long link
    if get_long(short):
132ikl's avatar
132ikl committed
63
64
65
66
        return True
    return False


67
68
69
70
71
72
73
74
75
def check_password(password, pass_config):
    if pass_config['password_hashed']:
        return bcrypt.checkpw(password.encode('utf-8'), pass_config['admin_hashed_password'].encode('utf-8'))
    elif not pass_config['password_hashed']:
        return password == pass_config['admin_password']
    else:
        raise RuntimeError('This should never occur! Bailing...')


132ikl's avatar
132ikl committed
76
77
78
79
80
def delete_url(deletion):
    result = query_db('SELECT * FROM urls WHERE short = ?', (deletion,), False, None)  # Return as tuple instead of row
    get_db().cursor().execute('DELETE FROM urls WHERE short = ?', (deletion,))
    get_db().commit()
    return len(result)
81
82


132ikl's avatar
132ikl committed
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def generate_short(rq):
    timeout = time.time() + current_app.config['random_gen_timeout']
    while True:
        if time.time() >= timeout:
            return response(rq, None, 'Timeout while generating random short URL')
        short = ''.join(random.choice(current_app.config['allowed_chars'])
                        for i in range(current_app.config['random_length']))
        if not check_short_exist(short):
            return short


132ikl's avatar
132ikl committed
101
102
103
104
105
106
107
def get_long(short):
    row = query_db('SELECT long FROM urls WHERE short = ?', (short,), True)
    if row and row['long']:
        return row['long']
    return None


132ikl's avatar
132ikl committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def list_shortlinks():
    result = query_db('SELECT * FROM urls', (), False, None)
    result = nested_list_to_dict(result)
    return result


def nested_list_to_dict(l):
    d = {}
    for nl in l:
            d[nl[0]] = nl[1]
    return d


def response(rq, result, error_msg="Error: Unknown error"):
    if 'api' in rq.form and 'format' not in rq.form:
        return "Format type HTML (default) not support for API"  # Future-proof for non-json return types
    if 'format' in rq.form and rq.form['format'] == 'json':
        # If not result provided OR result doesn't exist, send error
        # Allows for setting an error message with explicitly checking in regular code
        if result:
            if result is True:  # Allows sending with no result (ie. during deletion)
                return jsonify(success=True)
            else:
                return jsonify(success=True, result=result)
        else:
            return jsonify(success=False, error=error_msg)
    else:
132ikl's avatar
132ikl committed
135
136
137
138
139
140
141
        if result and result is not True:
            flash(result, 'success')
            return render_template("main.html")
        elif not result:
            flash(error_msg, 'error')
            return render_template("main.html")
        return render_template("main.html")
132ikl's avatar
132ikl committed
142
143
144
145
146
147
148
149
150


def validate_short(short):
    for char in short:
        if char not in current_app.config['allowed_chars']:
            return response(request, None,
                            'Character ' + char + ' not allowed in short URL')
    return True

151

132ikl's avatar
132ikl committed
152
153
154
def validate_long(long):  # https://stackoverflow.com/a/36283503
    token = urllib.parse.urlparse(long)
    return all([token.scheme, token.netloc])
155

132ikl's avatar
132ikl committed
156
# Database connection functions
157
158
159
160
161
162
163
164
165
166
167
168


def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(
            ''.join((current_app.config['database_name'], '.db')),
            detect_types=sqlite3.PARSE_DECLTYPES
        )
        g.db.cursor().execute('CREATE TABLE IF NOT EXISTS urls (long,short)')
    return g.db


132ikl's avatar
132ikl committed
169
170
def query_db(query, args=(), one=False, row_factory=sqlite3.Row):
    get_db().row_factory = row_factory
171
172
173
174
175
176
    cur = get_db().execute(query, args)
    rv = cur.fetchall()
    cur.close()
    return (rv[0] if rv else None) if one else rv


132ikl's avatar
132ikl committed
177
178
179
180
@app.teardown_appcontext
def close_db(error):
    if hasattr(g, 'sqlite_db'):
        g.sqlite_db.close()
181
182


132ikl's avatar
132ikl committed
183
app.config.update(load_config())  # Add YAML config to Flask config
132ikl's avatar
132ikl committed
184
app.secret_key = app.config['secret_key']
185
186
187


@app.route('/')
188
def main():
132ikl's avatar
132ikl committed
189
190
191
192
193
194
195
196
197
198
    return response(request, True)


@app.route('/<url>')
def main_redir(url):
    long = get_long(url)
    if long:
        return redirect(long, 301)
    flash('Short URL "' + url + '" doesn\'t exist', 'error')
    return redirect(url_for('main'))
199
200
201
202


@app.route('/', methods=['POST'])
def main_post():
132ikl's avatar
132ikl committed
203
    # Check if long in form (ie. provided by curl) and not blank (browsers always send blank forms as empty quote)
204
    if 'long' in request.form and request.form['long']:
132ikl's avatar
132ikl committed
205
206
        if not validate_long(request.form['long']):
            return response(request, None, "Long URL is not valid")
207
        if 'short' in request.form and request.form['short']:
132ikl's avatar
132ikl committed
208
209
210
211
212
213
            # Validate long as URL and short custom text against allowed characters
            result = validate_short(request.form['short'])
            if validate_short(request.form['short']) is True:
                short = request.form['short']
            else:
                return result
132ikl's avatar
132ikl committed
214
            if get_long(short) == request.form['long']:
132ikl's avatar
132ikl committed
215
216
                return response(request, (current_app.config['site_url'] or request.base_url) + short,
                                'Error: Failed to return pre-existing non-random shortlink')
217
        else:
132ikl's avatar
132ikl committed
218
            short = generate_short(request)
132ikl's avatar
132ikl committed
219
        if check_short_exist(short):
132ikl's avatar
132ikl committed
220
            return response(request, None,
132ikl's avatar
132ikl committed
221
                            'Short URL already taken')
222
        long_exists = check_long_exist(request.form['long'])
132ikl's avatar
132ikl committed
223
        if long_exists and not request.form['short']:
132ikl's avatar
132ikl committed
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
            return response(request, (current_app.config['site_url'] or request.base_url) + long_exists,
                            'Error: Failed to return pre-existing random shortlink')
        get_db().cursor().execute('INSERT INTO urls (long,short) VALUES (?,?)', (request.form['long'], short))
        get_db().commit()
        return response(request, (current_app.config['site_url'] or request.base_url) + short,
                        'Error: Failed to generate')
    elif 'api' in request.form:
        # All API calls require authentication
        if not request.authorization \
                or not authenticate(request.authorization['username'], request.authorization['password']):
            return response(request, None, "BaiscAuth failed")
        command = request.form['api']
        if command == 'list' or command == 'listshort':
            return response(request, list_shortlinks(), "Failed to list items")
        elif command == 'listlong':
            shortlinks = list_shortlinks()
            shortlinks = {v: k for k, v in shortlinks.items()}
            return response(request, shortlinks, "Failed to list items")
        elif command == 'delete':
            deleted = 0
            if 'long' not in request.form and 'short' not in request.form:
                return response(request, None, "Provide short or long in POST data")
            if 'short' in request.form:
                deleted = delete_url(request.form['short']) + deleted
            if 'long' in request.form:
                deleted = delete_url(request.form['long']) + deleted
            if deleted > 0:
                return response(request, "Deleted " + str(deleted) + " URLs")
            else:
                return response(request, None, "Failed to delete URL")
        else:
            return response(request, None, 'Command ' + command + ' not found')
256
    else:
132ikl's avatar
132ikl committed
257
        return response(request, None, 'Long URL required')
258
259
260
261


if __name__ == '__main__':
    app.run()