|
|
|
import json
|
|
|
|
import os
|
|
|
|
import random
|
|
|
|
import re
|
|
|
|
import time
|
|
|
|
import traceback
|
|
|
|
# import tomli_w
|
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
import tomli
|
|
|
|
|
|
|
|
from checksendNotify import send
|
|
|
|
|
|
|
|
|
|
|
|
def toml_to_json(toml_path, to_json_path):
|
|
|
|
"""
|
|
|
|
:param toml_path: 需要转换的toml文件的路径
|
|
|
|
:param to_json_path: 需要输出的json文件路径
|
|
|
|
:return: None
|
|
|
|
"""
|
|
|
|
with open(toml_path, "rb") as f:
|
|
|
|
toml_dict = tomli.load(f)
|
|
|
|
json_date = json.dumps(toml_dict, indent=4, ensure_ascii=False)
|
|
|
|
with open(to_json_path, 'w', encoding="utf8") as s:
|
|
|
|
s.write(json_date)
|
|
|
|
|
|
|
|
|
|
|
|
# def json_to_toml(json_path, to_toml_path):
|
|
|
|
# with open(json_path, "r", encoding="utf8") as f:
|
|
|
|
# json_dict = json.load(f)
|
|
|
|
# with open(to_toml_path, "wb") as f:
|
|
|
|
# tomli_w.dump(json_dict, f)
|
|
|
|
|
|
|
|
class config_get(object):
|
|
|
|
def __init__(self, config_path=None):
|
|
|
|
"""
|
|
|
|
config_path: 自定义配置文件路径
|
|
|
|
config_file: 实际使用的配置文件路径
|
|
|
|
config_format: 实际使用的配置文件格式
|
|
|
|
"""
|
|
|
|
self.config_path = config_path
|
|
|
|
self.config_file = self.get_config_file()
|
|
|
|
self.config_format = self.get_config_format()
|
|
|
|
|
|
|
|
def get_config_format(self):
|
|
|
|
if self.config_file.endswith('.toml'):
|
|
|
|
return "toml"
|
|
|
|
else:
|
|
|
|
return "json"
|
|
|
|
|
|
|
|
def get_config_file(self):
|
|
|
|
ql_new = '/ql/config/env.sh'
|
|
|
|
json_config_file = '/ql/config/check.json'
|
|
|
|
toml_config_file = '/ql/config/check.toml'
|
|
|
|
print('开始检查环境\n')
|
|
|
|
if os.path.exists(ql_new):
|
|
|
|
print('成功 当前环境为青龙面板继续执行\n')
|
|
|
|
if self.config_path is not None:
|
|
|
|
print("使用了自定义路径的配置文件")
|
|
|
|
return self.config_path
|
|
|
|
elif os.path.exists(toml_config_file):
|
|
|
|
print("未使用自定义配置文件,开始从ql/config中检测")
|
|
|
|
print("检测到toml格式配置文件\n")
|
|
|
|
return toml_config_file
|
|
|
|
elif os.path.exists(json_config_file):
|
|
|
|
print("检测到json格式配置文件\n")
|
|
|
|
return json_config_file
|
|
|
|
else:
|
|
|
|
print("未检测到配置文件,程序退出")
|
|
|
|
exit(1)
|
|
|
|
else:
|
|
|
|
print('失败 请检查环境')
|
|
|
|
exit(0)
|
|
|
|
return 0
|
|
|
|
|
|
|
|
def get_real_key(self, expression):
|
|
|
|
"""
|
|
|
|
从配置文件中获取,re表达式想要的KEY
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
pattern = re.compile(expression, re.I)
|
|
|
|
real_key = ''
|
|
|
|
if self.config_format == "toml":
|
|
|
|
for key in self.get_key_for_toml(self.config_file):
|
|
|
|
if pattern.match(key) is not None:
|
|
|
|
real_key = key
|
|
|
|
else:
|
|
|
|
for key in self.get_key_for_json(self.config_file):
|
|
|
|
if pattern.match(key) is not None:
|
|
|
|
real_key = key
|
|
|
|
if real_key is not '':
|
|
|
|
return real_key
|
|
|
|
else:
|
|
|
|
print("啊哦没有找到")
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
def get_value(self, expression):
|
|
|
|
real_key = self.get_real_key(expression)
|
|
|
|
if self.config_format == "toml":
|
|
|
|
return self.get_value_for_toml(self.config_file, real_key)
|
|
|
|
else:
|
|
|
|
return self.get_value_for_json(self.config_file, real_key)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_value_for_toml(toml_path, key):
|
|
|
|
with open(toml_path, "rb") as f:
|
|
|
|
try:
|
|
|
|
toml_dict = tomli.load(f)
|
|
|
|
return toml_dict.get(key)
|
|
|
|
except tomli.TOMLDecodeError:
|
|
|
|
print(
|
|
|
|
f"错误:配置文件 {toml_path} 格式不对,请学习 https://toml.io/cn/v1.0.0\n错误信息:\n{traceback.format_exc()}"
|
|
|
|
)
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_value_for_json(json_path, key):
|
|
|
|
with open(json_path, "r", encoding="utf8") as f:
|
|
|
|
try:
|
|
|
|
json_dict = json.load(f)
|
|
|
|
return json_dict.get(key)
|
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
|
print(f"错误:配置文件 {json_path} 格式不对,错误信息{traceback.format_exc()}")
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_key_for_toml(toml_path):
|
|
|
|
with open(toml_path, "rb") as f:
|
|
|
|
try:
|
|
|
|
toml_dict = tomli.load(f)
|
|
|
|
return toml_dict.keys()
|
|
|
|
except tomli.TOMLDecodeError:
|
|
|
|
print(
|
|
|
|
f"错误:配置文件 {toml_path} 格式不对,请学习 https://toml.io/cn/v1.0.0\n错误信息:\n{traceback.format_exc()}"
|
|
|
|
)
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_key_for_json(json_path):
|
|
|
|
with open(json_path, "r", encoding="utf8") as f:
|
|
|
|
try:
|
|
|
|
json_dict = json.load(f)
|
|
|
|
return json_dict.keys()
|
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
|
print(f"错误:配置文件 {json_path} 格式不对,错误信息{traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
|
|
|
class check(object):
|
|
|
|
def __init__(self, run_script_name, run_script_expression, Configuration_flag=False):
|
|
|
|
"""
|
|
|
|
:param run_script_name: 执行脚本的说明
|
|
|
|
:param run_script_expression: 需要获取的配置键的re表达式
|
|
|
|
:param Configuration_flag: 是否只检测True或False(默认为False)
|
|
|
|
"""
|
|
|
|
self.run_script_name = run_script_name
|
|
|
|
self.run_script_expression = run_script_expression
|
|
|
|
self.Configuration_flag = Configuration_flag
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def other_task():
|
|
|
|
# change_db()
|
|
|
|
pass
|
|
|
|
|
|
|
|
def __call__(self, func):
|
|
|
|
@wraps(func)
|
|
|
|
def wrapper():
|
|
|
|
if not self.Configuration_flag:
|
|
|
|
config = config_get()
|
|
|
|
value_list = config.get_value(self.run_script_expression)
|
|
|
|
Push_message = ""
|
|
|
|
num = 1
|
|
|
|
for value in value_list:
|
|
|
|
print(f"<----------------账号【{num}】---------------->")
|
|
|
|
print(f"获取到的账号信息为:{value}\n")
|
|
|
|
num += 1
|
|
|
|
try:
|
|
|
|
result = func(value=value)
|
|
|
|
print(f"执行结果:\n{result}")
|
|
|
|
Push_message += result
|
|
|
|
except IndexError:
|
|
|
|
print("可能是示例格式被运行\n错误信息:")
|
|
|
|
print(f"{traceback.format_exc()}")
|
|
|
|
Push_message += ''
|
|
|
|
except AttributeError:
|
|
|
|
print("可能是配置文件的键名出现问题\n"
|
|
|
|
"例如:在此次更新中什么值得买的键名从smzdm_cookie变成了cookie\n")
|
|
|
|
print(f"{traceback.format_exc()}")
|
|
|
|
Push_message += ''
|
|
|
|
except TypeError:
|
|
|
|
print(f"{traceback.format_exc()}")
|
|
|
|
Push_message += ''
|
|
|
|
send(self.run_script_name, Push_message)
|
|
|
|
else:
|
|
|
|
config = config_get()
|
|
|
|
flag = config.get_value(self.run_script_expression)
|
|
|
|
if flag is not None and flag:
|
|
|
|
print(f"开始执行{self.run_script_name}")
|
|
|
|
func()
|
|
|
|
else:
|
|
|
|
print(f"设置为不执行{self.run_script_name}")
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
def change_cron(cron_file_path="/ql/db/crontab.db", repositories="yuxian158_check"):
|
|
|
|
def change_time(time_str: str):
|
|
|
|
words = re.sub("\\s+", " ", time_str).split()
|
|
|
|
words[0] = str(random.randrange(60))
|
|
|
|
words[1] = str(random.randrange(22))
|
|
|
|
return " ".join(words)
|
|
|
|
|
|
|
|
time_str = time.strftime("%Y-%m-%d", time.localtime())
|
|
|
|
os.system(f"cp /ql/db/crontab.db /ql/db/crontab.db.{time_str}.back")
|
|
|
|
lines = []
|
|
|
|
with open(cron_file_path, "r", encoding="UTF-8") as f:
|
|
|
|
for i in f.readlines():
|
|
|
|
# print(record.get("command"))
|
|
|
|
if i.find(repositories) != -1:
|
|
|
|
record = json.loads(i)
|
|
|
|
record["schedule"] = change_time(record["schedule"])
|
|
|
|
lines.append(json.dumps(record, ensure_ascii=False) + "\n")
|
|
|
|
else:
|
|
|
|
lines.append(i)
|
|
|
|
|
|
|
|
with open(cron_file_path, "w", encoding="UTF-8") as f:
|
|
|
|
f.writelines(lines)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
change_cron()
|