# Init
import os
import signal
import atexit
import logging
from route.tool.func import *
from route import *
from waitress import serve
from werkzeug.middleware.proxy_fix import ProxyFix
args = sys.argv
run_mode = ''
if len(args) > 1:
run_mode = args[1]
if not run_mode in ['dev']:
run_mode = ''
# Init-Version
with open('version.json', encoding = 'utf8') as file_data:
version_list = json_loads(file_data.read())
# Init-DB
data_db_set = class_check_json()
do_db_set(data_db_set)
with get_db_connect(init_mode = True) as conn:
curs = conn.cursor()
setup_tool = ''
try:
curs.execute(db_change('select data from other where name = "ver"'))
except:
setup_tool = 'init'
if setup_tool != 'init':
ver_set_data = curs.fetchall()
if ver_set_data:
if int(version_list['c_ver']) > int(ver_set_data[0][0]):
setup_tool = 'update'
else:
setup_tool = 'normal'
else:
setup_tool = 'init'
if run_mode != 'dev':
file_name = linux_exe_chmod()
local_file_path = os.path.join("bin", file_name)
if not (setup_tool == "normal" and os.path.exists(local_file_path)):
if os.path.exists(local_file_path):
print('Remove Old Binary')
os.remove(local_file_path)
download_url = version_list["bin_link"] + file_name
print('Download New Binary File')
response = requests.get(download_url, stream = True)
if response.status_code == 200:
with open(local_file_path, 'wb') as file:
for chunk in response.iter_content(chunk_size = 8192):
file.write(chunk)
print('Complete Download')
if data_db_set['type'] == 'mysql':
try:
curs.execute(db_change('create database ' + data_db_set['name'] + ' default character set utf8mb4'))
except:
try:
curs.execute(db_change('alter database ' + data_db_set['name'] + ' character set utf8mb4'))
except:
pass
conn.select_db(data_db_set['name'])
else:
conn.execute('pragma journal_mode = WAL')
if setup_tool != 'normal':
create_data = get_db_table_list()
for create_table in create_data:
for create in ['test'] + create_data[create_table]:
db_pass = 0
try:
curs.execute(db_change('select ' + create + ' from ' + create_table + ' limit 1'))
db_pass = 1
except:
pass
field_text = 'longtext' if data_db_set['type'] == 'mysql' else 'text'
if db_pass == 0:
try:
curs.execute(db_change('create table ' + create_table + '(test ' + field_text + ' default (""))'))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
try:
curs.execute(db_change('create table ' + create_table + '(test ' + field_text + ' default "")'))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
try:
curs.execute(db_change('create table ' + create_table + '(test ' + field_text + ')'))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
try:
curs.execute(db_change("alter table " + create_table + " add column " + create + " " + field_text + " default ('')"))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
try:
curs.execute(db_change("alter table " + create_table + " add column " + create + " " + field_text + " default ''"))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
try:
curs.execute(db_change("alter table " + create_table + " add column " + create + " " + field_text))
db_pass = 1
except Exception as e:
# print(e)
pass
if db_pass == 0:
raise
try:
curs.execute(db_change("create index history_index on history (title, ip)"))
except:
pass
if setup_tool == 'update':
try:
loop = asyncio.get_running_loop()
loop.create_task(update(conn, int(ver_set_data[0][0]), data_db_set))
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(update(conn, int(ver_set_data[0][0]), data_db_set))
else:
set_init(conn)
set_init_always(conn, version_list['c_ver'], run_mode)
# Init-Route
class EverythingConverter(werkzeug.routing.PathConverter):
def __init__(self, map):
super(EverythingConverter, self).__init__(map)
self.regex = r'.*?'
def to_python(self, value):
return re.sub(r'^\\\.', '.', value)
class RegexConverter(werkzeug.routing.BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app = flask.Flask(__name__, template_folder = os.path.join(BASE_DIR, "views"))
app.config['JSON_AS_ASCII'] = False
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
if run_mode == 'dev':
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['DEBUG'] = True
app.config['ENV'] = 'development'
log = logging.getLogger('waitress')
log.setLevel(logging.ERROR)
app.url_map.converters['everything'] = EverythingConverter
app.url_map.converters['regex'] = RegexConverter
curs.execute(db_change('select data from other where name = "key"'))
sql_data = curs.fetchall()
app.secret_key = sql_data[0][0]
# Init-DB_Data
server_set = {}
server_set_var = get_init_set_list()
server_set_env = {
'host' : os.getenv('NAMU_HOST'),
'golang_port' : os.getenv('NAMU_GOLANGPORT'),
'port' : os.getenv('NAMU_PORT'),
'language' : os.getenv('NAMU_LANG'),
'markup' : os.getenv('NAMU_MARKUP'),
'encode' : os.getenv('NAMU_ENCRYPT')
}
for i in server_set_var:
curs.execute(db_change('select data from other where name = ?'), [i])
server_set_val = curs.fetchall()
if server_set_val:
server_set_val = server_set_val[0][0]
elif server_set_env[i] != None:
server_set_val = server_set_env[i]
curs.execute(db_change('insert into other (name, data, coverage) values (?, ?, "")'), [i, server_set_env[i]])
else:
if 'list' in server_set_var[i]:
print(server_set_var[i]['display'] + ' (' + server_set_var[i]['default'] + ') [' + ', '.join(server_set_var[i]['list']) + ']' + ' : ', end = '')
else:
print(server_set_var[i]['display'] + ' (' + server_set_var[i]['default'] + ') : ', end = '')
server_set_val = input()
if server_set_val == '':
server_set_val = server_set_var[i]['default']
elif server_set_var[i]['require'] == 'select':
if not server_set_val in server_set_var[i]['list']:
server_set_val = server_set_var[i]['default']
curs.execute(db_change('insert into other (name, data, coverage) values (?, ?, "")'), [i, server_set_val])
print(server_set_var[i]['display'] + ' : ' + server_set_val)
server_set[i] = server_set_val
for for_a in server_set:
global_some_set_do('setup_' + for_a, server_set[for_a])
###
async def golang_process_check():
while True:
try:
other_set_temp = {}
for k in data_db_set:
other_set_temp["db_" + k] = data_db_set[k]
other_set = {
"url" : "test",
"data" : json_dumps(other_set_temp),
"session" : "{}",
"cookies" : "",
"ip" : "127.0.0.1"
}
response = requests.post('http://localhost:' + server_set["golang_port"] + '/compatible_api/test', data = json_dumps(other_set))
if response.status_code == 200:
print('Golang turn on')
break
except requests.ConnectionError:
print('Wait golang...')
time.sleep(1)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
BIN_DIR = os.path.join(BASE_DIR, "bin")
exe_name = linux_exe_chmod()
exe_path = os.path.join(BIN_DIR, exe_name)
cmd = [exe_path, server_set["golang_port"], run_mode, 'api']
golang_process = subprocess.Popen(cmd, cwd = BIN_DIR)
try:
loop = asyncio.get_running_loop()
loop.create_task(golang_process_check())
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(golang_process_check())
###
def back_up(data_db_set):
with get_db_connect() as conn:
curs = conn.cursor()
try:
curs.execute(db_change('select data from other where name = "back_up"'))
back_time = curs.fetchall()
back_time = float(number_check(back_time[0][0], True)) if back_time and back_time[0][0] != '' else 0
curs.execute(db_change('select data from other where name = "backup_count"'))
back_up_count = curs.fetchall()
back_up_count = int(number_check(back_up_count[0][0])) if back_up_count and back_up_count[0][0] != '' else 3
if back_time != 0:
curs.execute(db_change('select data from other where name = "backup_where"'))
back_up_where = curs.fetchall()
back_up_where = back_up_where[0][0] if back_up_where and back_up_where[0][0] != '' else data_db_set['name'] + '.db'
print('Back up state : ' + str(back_time) + ' hours')
print('Back up directory : ' + back_up_where)
if back_up_count != 0:
print('Back up max number : ' + str(back_up_count))
file_dir = os.path.split(back_up_where)[0]
file_dir = '.' if file_dir == '' else file_dir
file_name = os.path.split(back_up_where)[1]
file_name = re.sub(r'\.db$', '_[0-9]{14}.db', file_name)
backup_file = [for_a for for_a in os.listdir(file_dir) if re.search('^' + file_name + '$', for_a)]
backup_file = sorted(backup_file)
if len(backup_file) >= back_up_count:
remove_dir = os.path.join(file_dir, backup_file[0])
os.remove(remove_dir)
print('Back up : Remove (' + remove_dir + ')')
now_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
new_file_name = re.sub(r'\.db$', '_' + now_time + '.db', back_up_where)
shutil.copyfile(
data_db_set['name'] + '.db',
new_file_name
)
print('Back up : OK (' + new_file_name + ')')
else:
print('Back up state : Turn off')
back_time = 1
except Exception as e:
print('Back up : Error')
print(e)
back_time = 1
threading.Timer(60 * 60 * back_time, back_up, [data_db_set]).start()
async def do_every_day():
with get_db_connect() as conn:
curs = conn.cursor()
# 오늘의 날짜 불러오기
time_today = get_time().split()[0]
# vote 관리
curs.execute(db_change('select id, type from vote where type = "open" or type = "n_open"'))
for for_a in curs.fetchall():
curs.execute(db_change('select data from vote where id = ? and name = "end_date" and type = "option"'), [for_a[0]])
db_data = curs.fetchall()
if db_data:
time_db = db_data[0][0].split()[0]
if time_today > time_db:
curs.execute(db_change("update vote set type = ? where user = '' and id = ? and type = ?"), ['close' if for_a[1] == 'open' else 'n_close', for_a[0], for_a[1]])
# ban 관리
curs.execute(db_change("update rb set ongoing = '' where end < ? and end != '' and ongoing = '1'"), [get_time()])
# auth 관리
curs.execute(db_change('select id, data from user_set where name = "auth_date"'))
db_data = curs.fetchall()
for for_a in db_data:
time_db = for_a[1].split()[0]
if time_today > time_db:
curs.execute(db_change("update user_set set data = 'user' where id = ? and name = 'acl'"), [for_a[0]])
curs.execute(db_change('delete from user_set where name = "auth_date" and id = ?'), [for_a[0]])
# acl 관리
curs.execute(db_change("select doc_name, doc_rev, set_data from data_set where set_name = 'acl_date'"))
db_data = curs.fetchall()
for for_a in db_data:
time_db = for_a[2].split()[0]
if time_today > time_db:
curs.execute(db_change("delete from acl where title = ? and type = ?"), [for_a[0], for_a[1]])
curs.execute(db_change("delete from data_set where doc_name = ? and doc_rev = ? and set_name = 'acl_date'"), [for_a[0], for_a[1]])
# ua 관리
curs.execute(db_change('select data from other where name = "ua_expiration_date"'))
db_data = curs.fetchall()
if db_data and db_data[0][0] != '':
time_db = int(number_check(db_data[0][0]))
time_calc = datetime.date.today() - datetime.timedelta(days = time_db)
time_calc = time_calc.strftime('%Y-%m-%d %H:%M:%S')
curs.execute(db_change("delete from ua_d where today < ?"), [time_calc])
# auth history 관리
curs.execute(db_change('select data from other where name = "auth_history_expiration_date"'))
db_data = curs.fetchall()
if db_data and db_data[0][0] != '':
time_db = int(number_check(db_data[0][0]))
time_calc = datetime.date.today() - datetime.timedelta(days = time_db)
time_calc = time_calc.strftime('%Y-%m-%d %H:%M:%S')
curs.execute(db_change("delete from re_admin where time < ?"), [time_calc])
# 사이트맵 생성 관리
curs.execute(db_change('select data from other where name = "sitemap_auto_make"'))
db_data = curs.fetchall()
if db_data and db_data[0][0] != '':
await main_setting_sitemap(1)
print('Make sitemap')
# 칭호 관리
curs.execute(db_change("select id from user_set where name = 'user_title' and data = '✅'"))
for for_a in curs.fetchall():
if await acl_check('', 'all_admin_auth', '', for_a[0]) == 1:
curs.execute(db_change("update user_set set data = '☑️' where name = 'user_title' and data = '✅' and id = ?"), [for_a[0]])
threading.Timer(60 * 60 * 24, do_every_day).start()
def auto_do_something(data_db_set):
if data_db_set['type'] == 'sqlite':
back_up(data_db_set)
try:
loop = asyncio.get_running_loop()
loop.create_task(do_every_day())
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(do_every_day())
auto_do_something(data_db_set)
print('Now running... http://localhost:' + server_set['port'])
@app.before_request
def before_request_func():
db_data = global_some_set_do('wiki_access_password')
if db_data and db_data != '':
access_password = db_data
input_password = flask.request.cookies.get('opennamu_wiki_access', ' ')
if url_pas(access_password) != input_password:
with get_db_connect() as conn:
return '''
''' + load_lang('error_password_require_for_wiki_access') + '''
'''
# Init-custom
if os.path.exists('custom.py'):
from custom import custom_run
custom_run('error', app)
# Route_Go_Func
import itertools
_golang_view_seq = itertools.count()
def golang_view():
idx = next(_golang_view_seq)
async def _view(**_):
return await python_to_golang("same")
_view.__name__ = f"_golang_view_{idx}"
return _view
# Func
# Func-inter_wiki
app.route('/filter/inter_wiki', defaults = { 'tool' : 'inter_wiki' })(filter_all)
app.route('/filter/inter_wiki/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'inter_wiki' })(filter_all_add)
app.route('/filter/inter_wiki/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'inter_wiki' })(filter_all_add)
app.route('/filter/inter_wiki/del/', defaults = { 'tool' : 'inter_wiki' })(filter_all_delete)
app.route('/filter/outer_link', defaults = { 'tool' : 'outer_link' })(filter_all)
app.route('/filter/outer_link/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'outer_link' })(filter_all_add)
app.route('/filter/outer_link/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'outer_link' })(filter_all_add)
app.route('/filter/outer_link/del/', defaults = { 'tool' : 'outer_link' })(filter_all_delete)
app.route('/filter/document', defaults = { 'tool' : 'document' })(filter_all)
app.route('/filter/document/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'document' })(filter_all_add)
app.route('/filter/document/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'document' })(filter_all_add)
app.route('/filter/document/del/', defaults = { 'tool' : 'document' })(filter_all_delete)
app.route('/filter/edit_top', defaults = { 'tool' : 'edit_top' })(filter_all)
app.route('/filter/edit_top/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'edit_top' })(filter_all_add)
app.route('/filter/edit_top/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'edit_top' })(filter_all_add)
app.route('/filter/edit_top/del/', defaults = { 'tool' : 'edit_top' })(filter_all_delete)
app.route('/filter/image_license', defaults = { 'tool' : 'image_license' })(filter_all)
app.route('/filter/image_license/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'image_license' })(filter_all_add)
app.route('/filter/image_license/del/', defaults = { 'tool' : 'image_license' })(filter_all_delete)
app.route('/filter/template', defaults = { 'tool' : 'template' })(filter_all)
app.route('/filter/template/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'template' })(filter_all_add)
app.route('/filter/template/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'template' })(filter_all_add)
app.route('/filter/template/del/', defaults = { 'tool' : 'template' })(filter_all_delete)
app.route('/filter/edit_filter', defaults = { 'tool' : 'edit_filter' })(filter_all)
app.route('/filter/edit_filter/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'edit_filter' })(filter_all_add)
app.route('/filter/edit_filter/add/', methods = ['POST', 'GET'], defaults = { 'tool' : 'edit_filter' })(filter_all_add)
app.route('/filter/edit_filter/del/', defaults = { 'tool' : 'edit_filter' })(filter_all_delete)
app.route('/filter/email_filter', defaults = { 'tool' : 'email_filter' })(filter_all)
app.route('/filter/email_filter/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'email_filter' })(filter_all_add)
app.route('/filter/email_filter/del/', defaults = { 'tool' : 'email_filter' })(filter_all_delete)
app.route('/filter/file_filter', defaults = { 'tool' : 'file_filter' })(filter_all)
app.route('/filter/file_filter/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'file_filter' })(filter_all_add)
app.route('/filter/file_filter/del/', defaults = { 'tool' : 'file_filter' })(filter_all_delete)
app.route('/filter/name_filter', defaults = { 'tool' : 'name_filter' })(filter_all)
app.route('/filter/name_filter/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'name_filter' })(filter_all_add)
app.route('/filter/name_filter/del/', defaults = { 'tool' : 'name_filter' })(filter_all_delete)
app.route('/filter/extension_filter', defaults = { 'tool' : 'extension_filter' })(filter_all)
app.route('/filter/extension_filter/add', methods = ['POST', 'GET'], defaults = { 'tool' : 'extension_filter' })(filter_all_add)
app.route('/filter/extension_filter/del/', defaults = { 'tool' : 'extension_filter' })(filter_all_delete)
# Func-list
app.route('/list/document/old', defaults = { 'set_type' : 'old' })(list_old_page)
app.route('/list/document/old/', defaults = { 'set_type' : 'old' })(list_old_page)
app.route('/list/document/new', defaults = { 'set_type' : 'new' })(list_old_page)
app.route('/list/document/new/', defaults = { 'set_type' : 'new' })(list_old_page)
app.route('/list/document/no_link')(list_no_link)
app.route('/list/document/no_link/')(list_no_link)
app.route('/list/document/acl')(list_acl)
app.route('/list/document/acl/')(list_acl)
app.route('/list/document/need')(list_please)
app.route('/list/document/need/')(list_please)
app.route('/list/document/all')(list_title_index)
app.route('/list/document/all/')(list_title_index)
app.route('/list/document/long')(list_long_page)
app.route('/list/document/long/')(list_long_page)
app.route('/list/document/short', defaults = { 'tool' : 'short_page' })(list_long_page)
app.route('/list/document/short/', defaults = { 'tool' : 'short_page' })(list_long_page)
app.route('/list/file')(list_image_file)
app.route('/list/file/')(list_image_file)
app.route('/list/image', defaults = { 'do_type' : 1 })(list_image_file)
app.route('/list/image/', defaults = { 'do_type' : 1 })(list_image_file)
app.route('/list/admin')(list_admin)
app.route('/list/admin/auth_use', methods = ['POST', 'GET'])(list_admin_auth_use)
app.route('/list/admin/auth_use_page//', methods = ['POST', 'GET'])(list_admin_auth_use)
app.route('/list/user')(list_user)
app.route('/list/user/')(list_user)
app.route('/list/user/check_submit/')(list_user_check_submit)
app.route('/list/user/check/')(list_user_check)
app.route('/list/user/check//')(list_user_check)
app.route('/list/user/check///')(list_user_check)
app.route('/list/user/check////')(list_user_check)
app.route('/list/user/check/delete///