' + load_lang('error') + '
' + \ '- ' + \
'
- ' + data + ' (' + load_lang('main_skin_set') + ') ' + \ '
# Init import os import sys import platform import json import smtplib import zipfile import shutil import logging import random import email.mime.text import email.utils import email.header import urllib.request # Init-Version version_list = json.loads(open('version.json', encoding = 'utf8').read()) print('Version : ' + version_list['beta']['r_ver']) print('DB set version : ' + version_list['beta']['c_ver']) print('Skin set version : ' + version_list['beta']['s_ver']) print('----') # Init-PIP_Install data_up_date = 1 if os.path.exists(os.path.join('data', 'version.json')): data_load_ver = open(os.path.join('data', 'version.json'), encoding = 'utf8').read() if data_load_ver == version_list['beta']['r_ver']: data_up_date = 0 if data_up_date == 1: with open(os.path.join('data', 'version.json'), 'w', encoding = 'utf8') as f: f.write(version_list['beta']['r_ver']) if platform.system() in ('Linux', 'Windows'): if platform.python_implementation() == 'PyPy': os.system( 'pypy' + ('3' if platform.system() != 'Windows' else '') + ' ' + \ '-m pip install --upgrade --user -r requirements.txt' ) else: os.system( 'python' + ('3' if platform.system() != 'Windows' else '') + ' ' + \ '-m pip install --upgrade --user -r requirements.txt' ) print('----') try: os.execl(sys.executable, sys.executable, *sys.argv) except: pass try: os.execl(sys.executable, '"' + sys.executable + '"', *sys.argv) except: print('Error : restart failed') raise else: print('Error : automatic installation is not supported.') print('Help : try "python3 -m pip install -r requirements.txt"') else: print('PIP check pass') print('----') # Init-Load from .func_mark import * from diff_match_patch import diff_match_patch import netius.servers import waitress import werkzeug.routing import werkzeug.debug import flask import requests import pymysql if sys.version_info < (3, 6): import sha3 # Init-Global global_lang = {} global_wiki_set = {} global_db_set = '' conn = '' # Func # Func-main def load_conn(data): global conn conn = data def do_db_set(db_set): global global_db_set global_db_set = db_set # Func-init class get_db_connect_old: def __init__(self, db_set): self.db_set = db_set self.conn = '' def db_load(self): if self.db_set['type'] == 'sqlite': self.conn = sqlite3.connect( self.db_set['name'] + '.db', check_same_thread = False, isolation_level = None ) self.conn.execute('pragma journal_mode = wal') else: self.conn = pymysql.connect( host = self.db_set['mysql_host'], user = self.db_set['mysql_user'], password = self.db_set['mysql_pw'], charset = 'utf8mb4', port = int(self.db_set['mysql_port']), ) curs = self.conn.cursor() try: curs.execute(db_change( 'create database ' + self.db_set['name'] + ' ' + \ 'default character set utf8mb4;' )) except: pass self.conn.select_db(self.db_set['name']) load_conn(self.conn) return self.conn def db_get(self): # if self.db_set['type'] != 'sqlite': # self.conn.ping(reconnect = True) return self.conn class get_db_connect: # 임시 DB 커넥션 동작 구조 # Init 파트 # DB 커넥트(get_db_connect_old) -> func.py로 conn 넘겨줌 # route 파트 # DB 새로 커넥트 -> func.py에서 쓰던 conn은 conn_sub로 보관 -> # func.py로 conn 넘겨줌 -> 모든 라우터 과정이 끝나면 conn_sub를 다시 func.py에 conn으로 넘겨줌 -> # DB 커넥트 종료 def __init__(self): global global_db_set global conn self.conn_sub = conn self.db_set = global_db_set def __enter__(self): if self.db_set['type'] == 'sqlite': self.conn = sqlite3.connect( self.db_set['name'] + '.db', check_same_thread = False, isolation_level = None ) self.conn.execute('pragma journal_mode = wal') else: self.conn = pymysql.connect( host = self.db_set['mysql_host'], user = self.db_set['mysql_user'], password = self.db_set['mysql_pw'], charset = 'utf8mb4', port = int(self.db_set['mysql_port']), ) curs = self.conn.cursor() try: curs.execute(db_change( 'create database ' + self.db_set['name'] + ' ' + \ 'default character set utf8mb4;' )) except: pass self.conn.select_db(self.db_set['name']) load_conn(self.conn) return self.conn def __exit__(self, exc_type, exc_value, traceback): load_conn(self.conn_sub) self.conn.close() class class_check_json: def do_check_set_json(): if os.getenv('NAMU_DB') or os.getenv('NAMU_DB_TYPE'): set_data = {} set_data['db'] = os.getenv('NAMU_DB') if os.getenv('NAMU_DB') else 'data' set_data['db'] = os.getenv('NAMU_DB_TYPE') if os.getenv('NAMU_DB_TYPE') else 'sqlite' else: if os.path.exists(os.path.join('data', 'set.json')): db_set_list = ['db', 'db_type'] set_data = json.loads(open( os.path.join('data', 'set.json'), encoding = 'utf8' ).read()) for i in db_set_list: if not i in set_data: os.remove(os.path.join('data', 'set.json')) break if not os.path.exists(os.path.join('data', 'set.json')): set_data = {} normal_db_type = ['sqlite', 'mysql'] print('DB type (' + normal_db_type[0] + ') [' + ', '.join(normal_db_type) + '] : ', end = '') data_get = str(input()) if data_get == '' or not data_get in normal_db_type: set_data['db_type'] = 'sqlite' else: set_data['db_type'] = data_get all_src = [] if set_data['db_type'] == 'sqlite': for i_data in os.listdir("."): f_src = re.search(r"(.+)\.db$", i_data) if f_src: all_src += [f_src.group(1)] print('DB name (data) [' + ', '.join(all_src) + '] : ', end = '') data_get = str(input()) if data_get == '': set_data['db'] = 'data' else: set_data['db'] = data_get with open(os.path.join('data', 'set.json'), 'w', encoding = 'utf8') as f: f.write(json.dumps(set_data)) print('DB name : ' + set_data['db']) print('DB type : ' + set_data['db_type']) data_db_set = {} data_db_set['name'] = set_data['db'] data_db_set['type'] = set_data['db_type'] return data_db_set def do_check_mysql_json(data_db_set): if os.path.exists(os.path.join('data', 'mysql.json')): db_set_list = ['user', 'password', 'host', 'port'] set_data = json.loads( open( os.path.join('data', 'mysql.json'), encoding = 'utf8' ).read() ) for i in db_set_list: if not i in set_data: os.remove(os.path.join('data', 'mysql.json')) break set_data_mysql = set_data if not os.path.exists(os.path.join('data', 'mysql.json')): set_data_mysql = {} print('DB user ID : ', end = '') set_data_mysql['user'] = str(input()) print('DB password : ', end = '') set_data_mysql['password'] = str(input()) print('DB host (localhost) : ', end = '') set_data_mysql['host'] = str(input()) if set_data_mysql['host'] == '': set_data_mysql['host'] = 'localhost' print('DB port (3306) : ', end = '') set_data_mysql['port'] = str(input()) if set_data_mysql['port'] == '': set_data_mysql['port'] = '3306' with open( os.path.join('data', 'mysql.json'), 'w', encoding = 'utf8' ) as f: f.write(json.dumps(set_data_mysql)) data_db_set['mysql_user'] = set_data_mysql['user'] data_db_set['mysql_pw'] = set_data_mysql['password'] if 'host' in set_data_mysql: data_db_set['mysql_host'] = set_data_mysql['host'] else: data_db_set['mysql_host'] = 'localhost' if 'port' in set_data_mysql: data_db_set['mysql_port'] = set_data_mysql['port'] else: data_db_set['mysql_port'] = '3306' return data_db_set def __init__(self): self.data_db_set = {} def __new__(self): self.data_db_set = self.do_check_set_json() if self.data_db_set['type'] == 'mysql': self.data_db_set = self.do_check_mysql_json(self.data_db_set) return self.data_db_set def update(ver_num, set_data): curs = conn.cursor() print('----') # 업데이트 하위 호환 유지 함수 if ver_num < 3160027: print('Add init set') set_init() if ver_num < 3170002: curs.execute(db_change("select html from html_filter where kind = 'extension'")) if not curs.fetchall(): for i in ['jpg', 'jpeg', 'png', 'gif', 'webp']: curs.execute(db_change( "insert into html_filter (html, kind) values (?, 'extension')" ), [i]) if ver_num < 3170400: curs.execute(db_change("select title, sub, code from topic where id = '1'")) for i in curs.fetchall(): curs.execute(db_change( "update topic set code = ? where title = ? and sub = ?" ), [ i[2], i[0], i[1] ]) curs.execute(db_change( "update rd set code = ? where title = ? and sub = ?" ), [ i[2], i[0], i[1] ]) if ver_num < 3171800: curs.execute(db_change("select data from other where name = 'recaptcha'")) change_rec = curs.fetchall() if change_rec and change_rec[0][0] != '': new_rec = re.search(r'data-sitekey="([^"]+)"', change_rec[0][0]) if new_rec: curs.execute(db_change( "update other set data = ? where name = 'recaptcha'" ), [new_rec.group(1)]) else: curs.execute(db_change("update other set data = '' where name = 'recaptcha'")) curs.execute(db_change("update other set data = '' where name = 'sec_re'")) if ver_num < 3172800 and \ set_data['db_type'] == 'mysql': get_data_mysql = json.loads(open('data/mysql.json', encoding = 'utf8').read()) with open('data/mysql.json', 'w') as f: f.write('{ "user" : "' + get_data_mysql['user'] + '", "password" : "' + get_data_mysql['password'] + '", "host" : "localhost" }') if ver_num < 3183603: curs.execute(db_change("select block from ban where band = 'O'")) for i in curs.fetchall(): curs.execute(db_change( "update ban set block = ?, band = 'regex' where block = ? and band = 'O'" ), [ '^' + i[0].replace('.', '\\.'), i[0] ]) curs.execute(db_change("select block from rb where band = 'O'")) for i in curs.fetchall(): curs.execute(db_change( "update rb set block = ?, band = 'regex' where block = ? and band = 'O'" ), [ '^' + i[0].replace('.', '\\.'), i[0] ]) if ver_num < 3190201: today_time = get_time() curs.execute(db_change("select block, end, why, band, login from ban")) for i in curs.fetchall(): curs.execute(db_change( "insert into rb (block, end, today, why, band, login, ongoing) " + \ "values (?, ?, ?, ?, ?, ?, ?)" ), [ i[0], i[1], today_time, i[2], i[3], i[4], '1' ]) if ver_num < 3191301: curs.execute(db_change('' + \ 'select id, title, date from history ' + \ 'where not title like "user:%" ' + \ 'order by date desc ' + \ 'limit 50' + \ '')) data_list = curs.fetchall() for get_data in data_list: curs.execute(db_change( "insert into rc (id, title, date, type) values (?, ?, ?, 'normal')" ), [ get_data[0], get_data[1], get_data[2] ]) if ver_num < 3202400: curs.execute(db_change("select data from other where name = 'update'")) get_data = curs.fetchall() if get_data and get_data[0][0] == 'master': curs.execute(db_change("update other set data = 'beta' where name = 'update'"), []) if ver_num < 3202600: curs.execute(db_change("select name, regex, sub from filter")) for i in curs.fetchall(): curs.execute(db_change( "insert into html_filter (html, kind, plus, plus_t) " + \ "values (?, 'regex_filter', ?, ?)" ), [ i[0], i[1], i[2] ]) curs.execute(db_change("select title, link, icon from inter")) for i in curs.fetchall(): curs.execute(db_change( "insert into html_filter (html, kind, plus, plus_t) " + \ "values (?, 'inter_wiki', ?, ?)"), [ i[0], i[1], i[2] ]) if ver_num < 3203400: curs.execute(db_change("select user, css from custom")) for i in curs.fetchall(): curs.execute(db_change( "insert into user_set (name, id, data) values ('custom_css', ?, ?)" ), [ re.sub(r' \(head\)$', '', i[0]), i[1] ]) if ver_num < 3205500: curs.execute(db_change("select title, decu, dis, view, why from acl")) for i in curs.fetchall(): curs.execute(db_change( "insert into acl (title, data, type) values (?, ?, ?)" ), [i[0], i[1], 'decu']) curs.execute(db_change( "insert into acl (title, data, type) values (?, ?, ?)" ), [i[0], i[2], 'dis']) curs.execute(db_change( "insert into acl (title, data, type) values (?, ?, ?)" ), [i[0], i[3], 'view']) curs.execute(db_change( "insert into acl (title, data, type) values (?, ?, ?)" ), [i[0], i[4], 'why']) if ver_num < 3300101: # 캐시 초기화 curs.execute(db_change('delete from cache_data')) if ver_num < 3300301: # regex_filter 오류 해결 curs.execute(db_change( 'delete from html_filter where kind = "regex_filter" and html is null' )) if ver_num < 3302302: # user이랑 user_set 테이블의 통합 curs.execute(db_change('select id, pw, acl, date, encode from user')) for i in curs.fetchall(): curs.execute(db_change( "insert into user_set (name, id, data) values (?, ?, ?)" ), ['pw', i[0], i[1]]) curs.execute(db_change( "insert into user_set (name, id, data) values (?, ?, ?)" ), ['acl', i[0], i[2]]) curs.execute(db_change( "insert into user_set (name, id, data) values (?, ?, ?)" ), ['date', i[0], i[3]]) curs.execute(db_change( "insert into user_set (name, id, data) values (?, ?, ?)" ), ['encode', i[0], i[4]]) if ver_num < 3400101: # user_set이랑 user_application 테이블의 통합 curs.execute(db_change('' + \ 'select id, pw, date, encode, question, answer, ip, ua, email ' + \ 'from user_application' + \ '')) for i in curs.fetchall(): sql_data = {} sql_data['id'] = i[0] sql_data['pw'] = i[1] sql_data['date'] = i[2] sql_data['encode'] = i[3] sql_data['question'] = i[4] sql_data['answer'] = i[5] sql_data['ip'] = i[6] sql_data['ua'] = i[7] sql_data['email'] = i[8] curs.execute(db_change( "insert into user_set (name, id, data) values (?, ?, ?)" ), ['application', i[0], json.dumps(sql_data)]) conn.commit() # 아이피 상태인 이메일 제거 예정 print('Update completed') def set_init_always(ver_num): curs = conn.cursor() curs.execute(db_change('delete from other where name = "ver"')) curs.execute(db_change('insert into other (name, data) values ("ver", ?)'), [ver_num]) curs.execute(db_change('delete from alist where name = "owner"')) curs.execute(db_change('insert into alist (name, acl) values ("owner", "owner")')) if not os.path.exists(load_image_url()): os.makedirs(load_image_url()) conn.commit() def set_init(): curs = conn.cursor() # 초기값 설정 함수 curs.execute(db_change("select html from html_filter where kind = 'email'")) if not curs.fetchall(): for i in ['naver.com', 'gmail.com', 'daum.net', 'kakao.com']: curs.execute(db_change( "insert into html_filter (html, kind, plus, plus_t) values (?, 'email', '', '')" ), [i]) curs.execute(db_change("select html from html_filter where kind = 'extension'")) if not curs.fetchall(): for i in ['jpg', 'jpeg', 'png', 'gif', 'webp']: curs.execute(db_change( "insert into html_filter (html, kind, plus, plus_t) values (?, 'extension', '', '')" ), [i]) curs.execute(db_change( 'select data from other ' + \ 'where name = "smtp_server" or name = "smtp_port" or name = "smtp_security"' )) if not curs.fetchall(): for i in [ ['smtp_server', 'smtp.gmail.com'], ['smtp_port', '587'], ['smtp_security', 'starttls'] ]: curs.execute(db_change( "insert into other (name, data) values (?, ?)" ), [i[0], i[1]]) curs.execute(db_change('select data from other where name = "key"')) rep_data = curs.fetchall() if not rep_data: curs.execute(db_change('insert into other (name, data) values ("key", ?)'), [load_random_key()]) curs.execute(db_change('select data from other where name = "count_all_title"')) if not curs.fetchall(): curs.execute(db_change('insert into other (name, data) values ("count_all_title", "0")')) conn.commit() # Func-simple ## Func-simple-without_DB def get_default_admin_group(): return ['owner', 'ban'] def get_user_title_list(): # default user_title = { '' : load_lang('default'), '🌳' : '🌳 namu', } # admin if admin_check('all') == 1: user_title['✅'] = '✅ admin' return user_title def load_random_key(long = 128): return ''.join( random.choice( "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" ) for i in range(long) ) def http_warning(): return '''
''' def next_fix(link, num, page, end = 50): list_data = '' if num == 1: if len(page) == end: list_data += '' + \ '', '')
data = data.replace('', ' ' * 100)
return data[0 : 100]
def wiki_set(num = 1):
curs = conn.cursor()
if num == 1:
skin_name = skin_check(1)
data_list = []
curs.execute(db_change('select data from other where name = ?'), ['name'])
db_data = curs.fetchall()
data_list += [db_data[0][0]] if db_data and db_data[0][0] != '' else ['Wiki']
curs.execute(db_change('select data from other where name = "license"'))
db_data = curs.fetchall()
data_list += [db_data[0][0]] if db_data and db_data[0][0] != '' else ['ARR']
data_list += ['', '']
curs.execute(db_change('select data from other where name = "logo" and coverage = ?'), [skin_name])
db_data = curs.fetchall()
if db_data and db_data[0][0] != '':
data_list += [db_data[0][0]]
else:
curs.execute(db_change('select data from other where name = "logo" and coverage = ""'))
db_data = curs.fetchall()
data_list += [db_data[0][0]] if db_data and db_data[0][0] != '' else [data_list[0]]
head_data = ''
curs.execute(db_change("select data from other where name = 'head' and coverage = ''"))
db_data = curs.fetchall()
head_data += db_data[0][0] if db_data and db_data[0][0] != '' else ''
curs.execute(db_change("select data from other where name = 'head' and coverage = ?"), [skin_name])
db_data = curs.fetchall()
head_data += db_data[0][0] if db_data and db_data[0][0] != '' else ''
data_list += [head_data]
elif num == 2:
curs.execute(db_change('select data from other where name = "frontpage"'))
db_data = curs.fetchall()
data_list = db_data[0][0] if db_data and db_data[0][0] != '' else 'FrontPage'
elif num == 3:
curs.execute(db_change('select data from other where name = "upload"'))
db_data = curs.fetchall()
data_list = db_data[0][0] if db_data and db_data[0][0] != '' else '2'
else:
data_list = ''
return data_list
def wiki_custom():
curs = conn.cursor()
ip = ip_check()
if ip_or_user(ip) == 0:
user_icon = 1
user_name = ip
curs.execute(db_change("select data from user_set where id = ? and name = 'custom_css'"), [ip])
user_head = curs.fetchall()
user_head = user_head[0][0] if user_head else ''
curs.execute(db_change('select data from user_set where name = "email" and id = ?'), [ip])
email = curs.fetchall()
email = email[0][0] if email else ''
if admin_check('all') == 1:
user_admin = '1'
user_acl_list = []
curs.execute(db_change("select data from user_set where id = ? and name = 'acl'"), [ip])
curs.execute(db_change('select acl from alist where name = ?'), [curs.fetchall()[0][0]])
user_acl = curs.fetchall()
for i in user_acl:
user_acl_list += [i[0]]
user_acl_list = user_acl_list if user_acl != [] else '0'
else:
user_admin = '0'
user_acl_list = '0'
curs.execute(db_change("select count(*) from alarm where name = ?"), [ip])
count = curs.fetchall()
user_notice = str(count[0][0]) if count else '0'
else:
user_icon = 0
user_name = load_lang('user')
email = ''
user_admin = '0'
user_acl_list = '0'
user_notice = '0'
user_head = flask.session['head'] if 'head' in flask.session else ''
curs.execute(db_change("select title from rd where title = ? and stop = ''"), ['user:' + ip])
user_topic = '1' if curs.fetchall() else '0'
split_path = flask.request.path.split('/')
if len(split_path) > 1:
split_path = split_path[1]
else:
split_path = 0
return [
'',
'',
user_icon,
user_head,
email,
user_name,
user_admin,
str(ban_check()),
user_notice,
user_acl_list,
ip,
user_topic,
split_path
]
def load_skin(data = '', set_n = 0, default = 0):
# without_DB
# data -> 가장 앞에 있을 스킨 이름
# set_n == 0 -> 스트링으로 반환
# set_n == 1 -> 리스트로 반환
# default == 0 -> 디폴트 미포함
# default == 1 -> 디폴트 포함
if set_n == 0:
skin_return_data = ''
else:
skin_return_data = []
skin_list_get = os.listdir('views')
if default == 1:
skin_list_get = ['default'] + skin_list_get
for skin_data in skin_list_get:
if skin_data != 'default':
see_data = skin_data
else:
see_data = load_lang('default')
if skin_data != 'main_css':
if set_n == 0:
if skin_data == data:
skin_return_data = '' + \
'' + \
'' + skin_return_data
else:
skin_return_data += '' + \
'' + \
''
else:
if skin_data == data:
skin_return_data = [skin_data] + skin_return_data
else:
skin_return_data += [skin_data]
return skin_return_data
# Func-markup
def render_set(doc_name = '', doc_data = '', data_type = 'view', data_in = '', doc_acl = ''):
# without_DB
# data_type in ['view', 'raw', 'api_view', 'backlink']
doc_acl = acl_check(doc_name, 'render') if doc_acl == '' else doc_acl
doc_data = 0 if doc_data == None else doc_data
if doc_acl == 1:
return 'HTTP Request 401.3'
else:
if data_type == 'raw':
return doc_data
else:
if doc_data != 0:
get_class_render = class_do_render(conn)
return get_class_render.do_render(doc_name, doc_data, data_type, data_in)
else:
return 'HTTP Request 404'
# Func-request
def send_email(who, title, data):
curs = conn.cursor()
try:
curs.execute(db_change('' + \
'select name, data from other ' + \
'where name = "smtp_email" or name = "smtp_pass" or name = "smtp_server" or name = "smtp_port" or name = "smtp_security"' + \
''))
rep_data = curs.fetchall()
smtp_email = ''
smtp_pass = ''
smtp_server = ''
smtp_security = ''
smtp_port = ''
smtp = ''
for i in rep_data:
if i[0] == 'smtp_email':
smtp_email = i[1]
elif i[0] == 'smtp_pass':
smtp_pass = i[1]
elif i[0] == 'smtp_server':
smtp_server = i[1]
elif i[0] == 'smtp_security':
smtp_security = i[1]
elif i[0] == 'smtp_port':
smtp_port = i[1]
smtp_port = int(smtp_port)
if smtp_security == 'plain':
smtp = smtplib.SMTP(smtp_server, smtp_port)
elif smtp_security == 'starttls':
smtp = smtplib.SMTP(smtp_server, smtp_port)
smtp.starttls()
else:
# if smtp_security == 'tls':
smtp = smtplib.SMTP_SSL(smtp_server, smtp_port)
smtp.login(smtp_email, smtp_pass)
domain = load_domain()
wiki_name = wiki_set()[0]
msg = email.mime.text.MIMEText(data)
msg['Subject'] = title
msg['From'] = 'openNAMU