青龙面板平台签到函数
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

229 lines
8.3 KiB

import json
import os
import random
import re
import time
import traceback
# import tomli_w
from functools import wraps
import tomli
from checksendNotify import send
def toml_to_json(toml_path, to_json_path):
"""
:param toml_path: 需要转换的toml文件的路径
:param to_json_path: 需要输出的json文件路径
:return: None
"""
with open(toml_path, "rb") as f:
toml_dict = tomli.load(f)
json_date = json.dumps(toml_dict, indent=4, ensure_ascii=False)
with open(to_json_path, 'w', encoding="utf8") as s:
s.write(json_date)
# def json_to_toml(json_path, to_toml_path):
# with open(json_path, "r", encoding="utf8") as f:
# json_dict = json.load(f)
# with open(to_toml_path, "wb") as f:
# tomli_w.dump(json_dict, f)
class config_get(object):
def __init__(self, config_path=None):
"""
config_path: 自定义配置文件路径
config_file: 实际使用的配置文件路径
config_format: 实际使用的配置文件格式
"""
self.config_path = config_path
self.config_file = self.get_config_file()
self.config_format = self.get_config_format()
def get_config_format(self):
if self.config_file.endswith('.toml'):
return "toml"
else:
return "json"
def get_config_file(self):
ql_new = '/ql/config/env.sh'
json_config_file = '/ql/config/check.json'
toml_config_file = '/ql/config/check.toml'
print('开始检查环境\n')
if os.path.exists(ql_new):
print('成功 当前环境为青龙面板继续执行\n')
if self.config_path is not None:
print("使用了自定义路径的配置文件")
return self.config_path
elif os.path.exists(toml_config_file):
print("未使用自定义配置文件,开始从ql/config中检测")
print("检测到toml格式配置文件\n")
return toml_config_file
elif os.path.exists(json_config_file):
print("检测到json格式配置文件\n")
return json_config_file
else:
print("未检测到配置文件,程序退出")
exit(1)
else:
print('失败 请检查环境')
exit(0)
return 0
def get_real_key(self, expression):
"""
从配置文件中获取,re表达式想要的KEY
:return:
"""
pattern = re.compile(expression, re.I)
real_key = ''
if self.config_format == "toml":
for key in self.get_key_for_toml(self.config_file):
if pattern.match(key) is not None:
real_key = key
else:
for key in self.get_key_for_json(self.config_file):
if pattern.match(key) is not None:
real_key = key
if real_key is not '':
return real_key
else:
print("啊哦没有找到")
exit(1)
def get_value(self, expression):
real_key = self.get_real_key(expression)
if self.config_format == "toml":
return self.get_value_for_toml(self.config_file, real_key)
else:
return self.get_value_for_json(self.config_file, real_key)
@staticmethod
def get_value_for_toml(toml_path, key):
with open(toml_path, "rb") as f:
try:
toml_dict = tomli.load(f)
return toml_dict.get(key)
except tomli.TOMLDecodeError:
print(
f"错误:配置文件 {toml_path} 格式不对,请学习 https://toml.io/cn/v1.0.0\n错误信息:\n{traceback.format_exc()}"
)
exit(1)
@staticmethod
def get_value_for_json(json_path, key):
with open(json_path, "r", encoding="utf8") as f:
try:
json_dict = json.load(f)
return json_dict.get(key)
except json.decoder.JSONDecodeError:
print(f"错误:配置文件 {json_path} 格式不对,错误信息{traceback.format_exc()}")
@staticmethod
def get_key_for_toml(toml_path):
with open(toml_path, "rb") as f:
try:
toml_dict = tomli.load(f)
return toml_dict.keys()
except tomli.TOMLDecodeError:
print(
f"错误:配置文件 {toml_path} 格式不对,请学习 https://toml.io/cn/v1.0.0\n错误信息:\n{traceback.format_exc()}"
)
exit(1)
@staticmethod
def get_key_for_json(json_path):
with open(json_path, "r", encoding="utf8") as f:
try:
json_dict = json.load(f)
return json_dict.keys()
except json.decoder.JSONDecodeError:
print(f"错误:配置文件 {json_path} 格式不对,错误信息{traceback.format_exc()}")
class check(object):
def __init__(self, run_script_name, run_script_expression, Configuration_flag=False):
"""
:param run_script_name: 执行脚本的说明
:param run_script_expression: 需要获取的配置键的re表达式
:param Configuration_flag: 是否只检测True或False(默认为False)
"""
self.run_script_name = run_script_name
self.run_script_expression = run_script_expression
self.Configuration_flag = Configuration_flag
@staticmethod
def other_task():
# change_db()
pass
def __call__(self, func):
@wraps(func)
def wrapper():
if not self.Configuration_flag:
config = config_get()
value_list = config.get_value(self.run_script_expression)
Push_message = ""
num = 1
for value in value_list:
print(f"<----------------账号【{num}】---------------->")
print(f"获取到的账号信息为:{value}\n")
num += 1
try:
result = func(value=value)
print(f"执行结果:\n{result}")
Push_message += result
except IndexError:
print("可能是示例格式被运行\n错误信息:")
print(f"{traceback.format_exc()}")
Push_message += ''
except AttributeError:
print("可能是配置文件的键名出现问题\n"
"例如:在此次更新中什么值得买的键名从smzdm_cookie变成了cookie\n")
print(f"{traceback.format_exc()}")
Push_message += ''
except TypeError:
print(f"{traceback.format_exc()}")
Push_message += ''
send(self.run_script_name, Push_message)
else:
config = config_get()
flag = config.get_value(self.run_script_expression)
if flag is not None and flag:
print(f"开始执行{self.run_script_name}")
func()
else:
print(f"设置为不执行{self.run_script_name}")
return wrapper
def change_cron(cron_file_path="/ql/db/crontab.db", repositories="yuxian158_check"):
def change_time(time_str: str):
words = re.sub("\\s+", " ", time_str).split()
words[0] = str(random.randrange(60))
words[1] = str(random.randrange(22))
return " ".join(words)
time_str = time.strftime("%Y-%m-%d", time.localtime())
os.system(f"cp /ql/db/crontab.db /ql/db/crontab.db.{time_str}.back")
lines = []
with open(cron_file_path, "r", encoding="UTF-8") as f:
for i in f.readlines():
# print(record.get("command"))
if i.find(repositories) != -1:
record = json.loads(i)
record["schedule"] = change_time(record["schedule"])
lines.append(json.dumps(record, ensure_ascii=False) + "\n")
else:
lines.append(i)
with open(cron_file_path, "w", encoding="UTF-8") as f:
f.writelines(lines)
if __name__ == "__main__":
change_cron()