liteshort.py 10.4 KB
Newer Older
132ikl's avatar
132ikl committed
1
2
3
4
5
# Copyright (c) 2019 Steven Spangler <132@ikl.sh>
# This file is part of liteshort by 132ikl
# This software is license under the MIT license. It should be included in your copy of this software.
# A copy of the MIT license can be obtained at https://mit-license.org/

132ikl's avatar
132ikl committed
6
from flask import Flask, current_app, flash, g, jsonify, redirect, render_template, request, url_for
7
8
9
10
import bcrypt
import random
import sqlite3
import time
132ikl's avatar
132ikl committed
11
import urllib
12
13
import yaml

132ikl's avatar
132ikl committed
14
15
app = Flask(__name__)

16
17

def load_config():
18
19
20
21
22
    new_config = yaml.load(open('config.yml'))
    new_config = {k.lower(): v for k, v in new_config.items()}  # Make config keys case insensitive

    req_options = {'admin_username': 'admin', 'database_name': "urls", 'random_length': 4,
                   'allowed_chars': 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_',
132ikl's avatar
132ikl committed
23
                   'random_gen_timeout': 5, 'site_name': 'liteshort', 'site_url': None, 'show_github_link': True,
132ikl's avatar
132ikl committed
24
                   'secret_key': None, 'disable_api': False
25
26
27
                   }

    config_types = {'admin_username': str, 'database_name': str, 'random_length': int,
132ikl's avatar
132ikl committed
28
                    'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str,
132ikl's avatar
132ikl committed
29
30
31
                    'site_url': (str, type(None)), 'show_github_link': bool, 'secret_key': str,
                    'disable_api': bool
                    }
32
33
34
35
36
37
38

    for option in req_options.keys():
        if option not in new_config.keys():  # Make sure everything in req_options is set in config
            new_config[option] = req_options[option]

    for option in new_config.keys():
        if option in config_types:
132ikl's avatar
132ikl committed
39
40
41
42
43
44
45
46
            matches = False
            if type(config_types[option]) is not tuple:
                config_types[option] = (config_types[option],)  # Automatically creates tuple for non-tuple types
            for req_type in config_types[option]:  # Iterates through tuple to allow multiple types for config options
                if type(new_config[option]) is req_type:
                    matches = True
            if not matches:
                raise TypeError(option + " is incorrect type")
132ikl's avatar
132ikl committed
47
48
49
50
51
52
53
    if not new_config['disable_api']:
        if 'admin_hashed_password' in new_config.keys():  # Sets config value to see if bcrypt is required to check password
            new_config['password_hashed'] = True
        elif 'admin_password' in new_config.keys():
            new_config['password_hashed'] = False
        else:
            raise TypeError('admin_password or admin_hashed_password must be set in config.yml')
54
55
56
    return new_config


132ikl's avatar
132ikl committed
57
58
59
60
61
62
63
64
65
66
67
68
def authenticate(username, password):
    return username == current_app.config['admin_username'] and check_password(password, current_app.config)


def check_long_exist(long):
    query = query_db('SELECT short FROM urls WHERE long = ?', (long,))
    for i in query:
        if i and (len(i['short']) <= current_app.config["random_length"]):  # Checks if query if pre-existing URL is same as random length URL
            return i['short']
    return False


132ikl's avatar
132ikl committed
69
70
def check_short_exist(short):  # Allow to also check against a long link
    if get_long(short):
132ikl's avatar
132ikl committed
71
72
73
74
        return True
    return False


75
76
77
78
79
80
81
82
83
def check_password(password, pass_config):
    if pass_config['password_hashed']:
        return bcrypt.checkpw(password.encode('utf-8'), pass_config['admin_hashed_password'].encode('utf-8'))
    elif not pass_config['password_hashed']:
        return password == pass_config['admin_password']
    else:
        raise RuntimeError('This should never occur! Bailing...')


132ikl's avatar
132ikl committed
84
85
86
87
88
def delete_url(deletion):
    result = query_db('SELECT * FROM urls WHERE short = ?', (deletion,), False, None)  # Return as tuple instead of row
    get_db().cursor().execute('DELETE FROM urls WHERE short = ?', (deletion,))
    get_db().commit()
    return len(result)
89
90


132ikl's avatar
132ikl committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def generate_short(rq):
    timeout = time.time() + current_app.config['random_gen_timeout']
    while True:
        if time.time() >= timeout:
            return response(rq, None, 'Timeout while generating random short URL')
        short = ''.join(random.choice(current_app.config['allowed_chars'])
                        for i in range(current_app.config['random_length']))
        if not check_short_exist(short):
            return short


132ikl's avatar
132ikl committed
109
110
111
112
113
114
115
def get_long(short):
    row = query_db('SELECT long FROM urls WHERE short = ?', (short,), True)
    if row and row['long']:
        return row['long']
    return None


132ikl's avatar
132ikl committed
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def list_shortlinks():
    result = query_db('SELECT * FROM urls', (), False, None)
    result = nested_list_to_dict(result)
    return result


def nested_list_to_dict(l):
    d = {}
    for nl in l:
            d[nl[0]] = nl[1]
    return d


def response(rq, result, error_msg="Error: Unknown error"):
    if 'api' in rq.form and 'format' not in rq.form:
        return "Format type HTML (default) not support for API"  # Future-proof for non-json return types
    if 'format' in rq.form and rq.form['format'] == 'json':
        # If not result provided OR result doesn't exist, send error
        # Allows for setting an error message with explicitly checking in regular code
        if result:
            if result is True:  # Allows sending with no result (ie. during deletion)
                return jsonify(success=True)
            else:
                return jsonify(success=True, result=result)
        else:
            return jsonify(success=False, error=error_msg)
    else:
132ikl's avatar
132ikl committed
143
144
145
146
147
148
149
        if result and result is not True:
            flash(result, 'success')
            return render_template("main.html")
        elif not result:
            flash(error_msg, 'error')
            return render_template("main.html")
        return render_template("main.html")
132ikl's avatar
132ikl committed
150
151
152
153
154
155
156
157
158


def validate_short(short):
    for char in short:
        if char not in current_app.config['allowed_chars']:
            return response(request, None,
                            'Character ' + char + ' not allowed in short URL')
    return True

159

132ikl's avatar
132ikl committed
160
161
162
def validate_long(long):  # https://stackoverflow.com/a/36283503
    token = urllib.parse.urlparse(long)
    return all([token.scheme, token.netloc])
163

132ikl's avatar
132ikl committed
164
# Database connection functions
165
166
167
168
169
170
171
172
173
174
175
176


def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(
            ''.join((current_app.config['database_name'], '.db')),
            detect_types=sqlite3.PARSE_DECLTYPES
        )
        g.db.cursor().execute('CREATE TABLE IF NOT EXISTS urls (long,short)')
    return g.db


132ikl's avatar
132ikl committed
177
178
def query_db(query, args=(), one=False, row_factory=sqlite3.Row):
    get_db().row_factory = row_factory
179
180
181
182
183
184
    cur = get_db().execute(query, args)
    rv = cur.fetchall()
    cur.close()
    return (rv[0] if rv else None) if one else rv


132ikl's avatar
132ikl committed
185
186
187
188
@app.teardown_appcontext
def close_db(error):
    if hasattr(g, 'sqlite_db'):
        g.sqlite_db.close()
189
190


132ikl's avatar
132ikl committed
191
app.config.update(load_config())  # Add YAML config to Flask config
132ikl's avatar
132ikl committed
192
app.secret_key = app.config['secret_key']
193
194
195


@app.route('/')
196
def main():
132ikl's avatar
132ikl committed
197
198
199
200
201
202
203
204
205
206
    return response(request, True)


@app.route('/<url>')
def main_redir(url):
    long = get_long(url)
    if long:
        return redirect(long, 301)
    flash('Short URL "' + url + '" doesn\'t exist', 'error')
    return redirect(url_for('main'))
207
208
209
210


@app.route('/', methods=['POST'])
def main_post():
132ikl's avatar
132ikl committed
211
    # Check if long in form (ie. provided by curl) and not blank (browsers always send blank forms as empty quote)
212
    if 'long' in request.form and request.form['long']:
132ikl's avatar
132ikl committed
213
214
        if not validate_long(request.form['long']):
            return response(request, None, "Long URL is not valid")
215
        if 'short' in request.form and request.form['short']:
132ikl's avatar
132ikl committed
216
217
218
219
220
221
            # Validate long as URL and short custom text against allowed characters
            result = validate_short(request.form['short'])
            if validate_short(request.form['short']) is True:
                short = request.form['short']
            else:
                return result
132ikl's avatar
132ikl committed
222
            if get_long(short) == request.form['long']:
132ikl's avatar
132ikl committed
223
224
                return response(request, (current_app.config['site_url'] or request.base_url) + short,
                                'Error: Failed to return pre-existing non-random shortlink')
225
        else:
132ikl's avatar
132ikl committed
226
            short = generate_short(request)
132ikl's avatar
132ikl committed
227
        if check_short_exist(short):
132ikl's avatar
132ikl committed
228
            return response(request, None,
132ikl's avatar
132ikl committed
229
                            'Short URL already taken')
230
        long_exists = check_long_exist(request.form['long'])
132ikl's avatar
132ikl committed
231
        if long_exists and not request.form['short']:
132ikl's avatar
132ikl committed
232
233
234
235
236
237
238
            return response(request, (current_app.config['site_url'] or request.base_url) + long_exists,
                            'Error: Failed to return pre-existing random shortlink')
        get_db().cursor().execute('INSERT INTO urls (long,short) VALUES (?,?)', (request.form['long'], short))
        get_db().commit()
        return response(request, (current_app.config['site_url'] or request.base_url) + short,
                        'Error: Failed to generate')
    elif 'api' in request.form:
132ikl's avatar
132ikl committed
239
240
        if current_app.config['disable_api']:
            return response(request, None, "API is disabled.")
132ikl's avatar
132ikl committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
        # All API calls require authentication
        if not request.authorization \
                or not authenticate(request.authorization['username'], request.authorization['password']):
            return response(request, None, "BaiscAuth failed")
        command = request.form['api']
        if command == 'list' or command == 'listshort':
            return response(request, list_shortlinks(), "Failed to list items")
        elif command == 'listlong':
            shortlinks = list_shortlinks()
            shortlinks = {v: k for k, v in shortlinks.items()}
            return response(request, shortlinks, "Failed to list items")
        elif command == 'delete':
            deleted = 0
            if 'long' not in request.form and 'short' not in request.form:
                return response(request, None, "Provide short or long in POST data")
            if 'short' in request.form:
                deleted = delete_url(request.form['short']) + deleted
            if 'long' in request.form:
                deleted = delete_url(request.form['long']) + deleted
            if deleted > 0:
                return response(request, "Deleted " + str(deleted) + " URLs")
            else:
                return response(request, None, "Failed to delete URL")
        else:
            return response(request, None, 'Command ' + command + ' not found')
266
    else:
132ikl's avatar
132ikl committed
267
        return response(request, None, 'Long URL required')
268
269
270
271


if __name__ == '__main__':
    app.run()