liteshort.py 10.8 KB
Newer Older
132ikl's avatar
132ikl committed
1
2
3
4
5
# Copyright (c) 2019 Steven Spangler <132@ikl.sh>
# This file is part of liteshort by 132ikl
# This software is license under the MIT license. It should be included in your copy of this software.
# A copy of the MIT license can be obtained at https://mit-license.org/

132ikl's avatar
132ikl committed
6
from flask import Flask, current_app, flash, g, jsonify, redirect, render_template, request, send_from_directory, url_for
7
import bcrypt
132ikl's avatar
132ikl committed
8
import os
9
10
11
import random
import sqlite3
import time
132ikl's avatar
132ikl committed
12
import urllib
13
14
import yaml

132ikl's avatar
132ikl committed
15
16
app = Flask(__name__)

17
18

def load_config():
19
20
21
22
23
    new_config = yaml.load(open('config.yml'))
    new_config = {k.lower(): v for k, v in new_config.items()}  # Make config keys case insensitive

    req_options = {'admin_username': 'admin', 'database_name': "urls", 'random_length': 4,
                   'allowed_chars': 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_',
24
25
                   'random_gen_timeout': 5, 'site_name': 'liteshort', 'site_domain': None, 'show_github_link': True,
                   'secret_key': None, 'disable_api': False, 'subdomain': ''
26
27
28
                   }

    config_types = {'admin_username': str, 'database_name': str, 'random_length': int,
132ikl's avatar
132ikl committed
29
                    'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str,
30
31
                    'site_domain': (str, type(None)), 'show_github_link': bool, 'secret_key': str,
                    'disable_api': bool, 'subdomain': (str, type(None))
132ikl's avatar
132ikl committed
32
                    }
33
34
35
36
37
38
39

    for option in req_options.keys():
        if option not in new_config.keys():  # Make sure everything in req_options is set in config
            new_config[option] = req_options[option]

    for option in new_config.keys():
        if option in config_types:
132ikl's avatar
132ikl committed
40
41
42
43
44
45
46
47
            matches = False
            if type(config_types[option]) is not tuple:
                config_types[option] = (config_types[option],)  # Automatically creates tuple for non-tuple types
            for req_type in config_types[option]:  # Iterates through tuple to allow multiple types for config options
                if type(new_config[option]) is req_type:
                    matches = True
            if not matches:
                raise TypeError(option + " is incorrect type")
132ikl's avatar
132ikl committed
48
    if not new_config['disable_api']:
49
        if 'admin_hashed_password' in new_config.keys() and new_config['admin_hashed_password']:
132ikl's avatar
132ikl committed
50
            new_config['password_hashed'] = True
51
        elif 'admin_password' in new_config.keys() and new_config['admin_password']:
132ikl's avatar
132ikl committed
52
53
54
            new_config['password_hashed'] = False
        else:
            raise TypeError('admin_password or admin_hashed_password must be set in config.yml')
55
56
57
    return new_config


132ikl's avatar
132ikl committed
58
59
60
61
62
63
64
65
66
67
68
69
def authenticate(username, password):
    return username == current_app.config['admin_username'] and check_password(password, current_app.config)


def check_long_exist(long):
    query = query_db('SELECT short FROM urls WHERE long = ?', (long,))
    for i in query:
        if i and (len(i['short']) <= current_app.config["random_length"]):  # Checks if query if pre-existing URL is same as random length URL
            return i['short']
    return False


132ikl's avatar
132ikl committed
70
71
def check_short_exist(short):  # Allow to also check against a long link
    if get_long(short):
132ikl's avatar
132ikl committed
72
73
74
75
        return True
    return False


76
77
78
79
80
81
82
83
84
def check_password(password, pass_config):
    if pass_config['password_hashed']:
        return bcrypt.checkpw(password.encode('utf-8'), pass_config['admin_hashed_password'].encode('utf-8'))
    elif not pass_config['password_hashed']:
        return password == pass_config['admin_password']
    else:
        raise RuntimeError('This should never occur! Bailing...')


132ikl's avatar
132ikl committed
85
86
87
88
89
def delete_url(deletion):
    result = query_db('SELECT * FROM urls WHERE short = ?', (deletion,), False, None)  # Return as tuple instead of row
    get_db().cursor().execute('DELETE FROM urls WHERE short = ?', (deletion,))
    get_db().commit()
    return len(result)
90
91


132ikl's avatar
132ikl committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def generate_short(rq):
    timeout = time.time() + current_app.config['random_gen_timeout']
    while True:
        if time.time() >= timeout:
            return response(rq, None, 'Timeout while generating random short URL')
        short = ''.join(random.choice(current_app.config['allowed_chars'])
                        for i in range(current_app.config['random_length']))
        if not check_short_exist(short):
            return short


132ikl's avatar
132ikl committed
110
111
112
113
114
115
116
def get_long(short):
    row = query_db('SELECT long FROM urls WHERE short = ?', (short,), True)
    if row and row['long']:
        return row['long']
    return None


132ikl's avatar
132ikl committed
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def list_shortlinks():
    result = query_db('SELECT * FROM urls', (), False, None)
    result = nested_list_to_dict(result)
    return result


def nested_list_to_dict(l):
    d = {}
    for nl in l:
            d[nl[0]] = nl[1]
    return d


def response(rq, result, error_msg="Error: Unknown error"):
131
    if rq.form.get('api') and not rq.form.get('format') == 'json':
132ikl's avatar
132ikl committed
132
        return "Format type HTML (default) not support for API"  # Future-proof for non-json return types
133
    if rq.form.get('format') == 'json':
132ikl's avatar
132ikl committed
134
135
136
137
138
139
140
141
142
143
        # If not result provided OR result doesn't exist, send error
        # Allows for setting an error message with explicitly checking in regular code
        if result:
            if result is True:  # Allows sending with no result (ie. during deletion)
                return jsonify(success=True)
            else:
                return jsonify(success=True, result=result)
        else:
            return jsonify(success=False, error=error_msg)
    else:
132ikl's avatar
132ikl committed
144
145
146
147
148
149
150
        if result and result is not True:
            flash(result, 'success')
            return render_template("main.html")
        elif not result:
            flash(error_msg, 'error')
            return render_template("main.html")
        return render_template("main.html")
132ikl's avatar
132ikl committed
151
152
153
154
155
156
157
158
159


def validate_short(short):
    for char in short:
        if char not in current_app.config['allowed_chars']:
            return response(request, None,
                            'Character ' + char + ' not allowed in short URL')
    return True

160

132ikl's avatar
132ikl committed
161
162
163
def validate_long(long):  # https://stackoverflow.com/a/36283503
    token = urllib.parse.urlparse(long)
    return all([token.scheme, token.netloc])
164

132ikl's avatar
132ikl committed
165
# Database connection functions
166
167
168
169
170
171
172
173
174
175
176
177


def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(
            ''.join((current_app.config['database_name'], '.db')),
            detect_types=sqlite3.PARSE_DECLTYPES
        )
        g.db.cursor().execute('CREATE TABLE IF NOT EXISTS urls (long,short)')
    return g.db


132ikl's avatar
132ikl committed
178
179
def query_db(query, args=(), one=False, row_factory=sqlite3.Row):
    get_db().row_factory = row_factory
180
181
182
183
184
185
    cur = get_db().execute(query, args)
    rv = cur.fetchall()
    cur.close()
    return (rv[0] if rv else None) if one else rv


132ikl's avatar
132ikl committed
186
187
188
189
@app.teardown_appcontext
def close_db(error):
    if hasattr(g, 'sqlite_db'):
        g.sqlite_db.close()
190
191


132ikl's avatar
132ikl committed
192
app.config.update(load_config())  # Add YAML config to Flask config
132ikl's avatar
132ikl committed
193
app.secret_key = app.config['secret_key']
194
app.config['SERVER_NAME'] = app.config['site_domain']
195
196


132ikl's avatar
132ikl committed
197
198
199
200
201
202
@app.route('/favicon.ico', subdomain=app.config['subdomain'])
def favicon():
    return send_from_directory(os.path.join(app.root_path, 'static'),
                               'favicon.ico', mimetype='image/vnd.microsoft.icon')


203
@app.route('/', subdomain=app.config['subdomain'])
204
def main():
132ikl's avatar
132ikl committed
205
206
207
208
209
210
211
212
213
    return response(request, True)


@app.route('/<url>')
def main_redir(url):
    long = get_long(url)
    if long:
        return redirect(long, 301)
    flash('Short URL "' + url + '" doesn\'t exist', 'error')
214
    redirect_site = url_for('main')
215
    return redirect(redirect_site)
216
217


218
@app.route('/', methods=['POST'], subdomain=app.config['subdomain'])
219
def main_post():
220
    if request.form.get('long'):
132ikl's avatar
132ikl committed
221
222
        if not validate_long(request.form['long']):
            return response(request, None, "Long URL is not valid")
223
        if request.form.get('short'):
132ikl's avatar
132ikl committed
224
225
226
227
228
229
            # Validate long as URL and short custom text against allowed characters
            result = validate_short(request.form['short'])
            if validate_short(request.form['short']) is True:
                short = request.form['short']
            else:
                return result
132ikl's avatar
132ikl committed
230
            if get_long(short) == request.form['long']:
231
                return response(request, (('https://' + app.config['site_domain'] + '/') or request.base_url) + short,
132ikl's avatar
132ikl committed
232
                                'Error: Failed to return pre-existing non-random shortlink')
233
        else:
132ikl's avatar
132ikl committed
234
            short = generate_short(request)
132ikl's avatar
132ikl committed
235
        if check_short_exist(short):
132ikl's avatar
132ikl committed
236
            return response(request, None,
132ikl's avatar
132ikl committed
237
                            'Short URL already taken')
238
        long_exists = check_long_exist(request.form['long'])
239
240
241
        if long_exists and not request.form.get('short'):
            # TODO: un-hack-ify adding the protocol here
            return response(request, (('https://' + app.config['site_domain'] + '/') or request.base_url) + long_exists,
132ikl's avatar
132ikl committed
242
243
244
                            'Error: Failed to return pre-existing random shortlink')
        get_db().cursor().execute('INSERT INTO urls (long,short) VALUES (?,?)', (request.form['long'], short))
        get_db().commit()
245
        return response(request, (('https://' + app.config['site_domain'] + '/') or request.base_url) + short,
132ikl's avatar
132ikl committed
246
                        'Error: Failed to generate')
247
    elif request.form.get('api'):
132ikl's avatar
132ikl committed
248
249
        if current_app.config['disable_api']:
            return response(request, None, "API is disabled.")
132ikl's avatar
132ikl committed
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
        # All API calls require authentication
        if not request.authorization \
                or not authenticate(request.authorization['username'], request.authorization['password']):
            return response(request, None, "BaiscAuth failed")
        command = request.form['api']
        if command == 'list' or command == 'listshort':
            return response(request, list_shortlinks(), "Failed to list items")
        elif command == 'listlong':
            shortlinks = list_shortlinks()
            shortlinks = {v: k for k, v in shortlinks.items()}
            return response(request, shortlinks, "Failed to list items")
        elif command == 'delete':
            deleted = 0
            if 'long' not in request.form and 'short' not in request.form:
                return response(request, None, "Provide short or long in POST data")
            if 'short' in request.form:
                deleted = delete_url(request.form['short']) + deleted
            if 'long' in request.form:
                deleted = delete_url(request.form['long']) + deleted
            if deleted > 0:
                return response(request, "Deleted " + str(deleted) + " URLs")
            else:
                return response(request, None, "Failed to delete URL")
        else:
            return response(request, None, 'Command ' + command + ' not found')
275
    else:
132ikl's avatar
132ikl committed
276
        return response(request, None, 'Long URL required')
277
278
279
280


if __name__ == '__main__':
    app.run()