robot.utils 源代码

# -*- coding: utf-8-*-

import os
import tempfile
import wave
import shutil
import re
import time
import hashlib
import subprocess
from . import constants, config
from robot import logging
from pydub import AudioSegment
from pytz import timezone
import _thread as thread

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

logger = logging.getLogger(__name__)

do_not_bother = False

[文档]def sendEmail(SUBJECT, BODY, ATTACH_LIST, TO, FROM, SENDER, PASSWORD, SMTP_SERVER, SMTP_PORT): """ 发送邮件 :param SUBJECT: 邮件标题 :param BODY: 邮件正文 :param ATTACH_LIST: 附件 :param TO: 收件人 :param FROM: 发件人 :param SENDER: 发件人信息 :param PASSWORD: 密码 :param SMTP_SERVER: smtp 服务器 :param SMTP_PORT: smtp 端口号 :returns: True: 发送成功; False: 发送失败 """ txt = MIMEText(BODY.encode('utf-8'), 'html', 'utf-8') msg = MIMEMultipart() msg.attach(txt) for attach in ATTACH_LIST: try: att = MIMEText(open(attach, 'rb').read(), 'base64', 'utf-8') filename = os.path.basename(attach) att["Content-Type"] = 'application/octet-stream' att["Content-Disposition"] = 'attachment; filename="%s"' % filename msg.attach(att) except Exception: logger.error(u'附件 %s 发送失败!' % attach) continue msg['From'] = SENDER msg['To'] = TO msg['Subject'] = SUBJECT try: session = smtplib.SMTP() session.connect(SMTP_SERVER, SMTP_PORT) session.starttls() session.login(FROM, PASSWORD) session.sendmail(SENDER, TO, msg.as_string()) session.close() return True except Exception as e: logger.error(e) return False
[文档]def emailUser(SUBJECT="", BODY="", ATTACH_LIST=[]): """ 给用户发送邮件 :param SUBJECT: subject line of the email :param BODY: body text of the email :returns: True: 发送成功; False: 发送失败 """ # add footer if BODY: BODY = u"%s,<br><br>这是您要的内容:<br>%s<br>" % (config['first_name'], BODY) recipient = config.get('/email/address', '') robot_name = config.get('robot_name_cn', 'wukong-robot') recipient = robot_name + " <%s>" % recipient user = config.get('/email/address', '') password = config.get('/email/password', '') server = config.get('/email/smtp_server', '') port = config.get('/email/smtp_port', '') if not recipient or not user or not password or not server or not port: return False try: sendEmail(SUBJECT, BODY, ATTACH_LIST, user, user, recipient, password, server, port) return True except Exception as e: logger.error(e) return False
[文档]def get_file_content(filePath): """ 读取文件内容并返回 :param filePath: 文件路径 :returns: 文件内容 :raises IOError: 读取失败则抛出 IOError """ with open(filePath, 'rb') as fp: return fp.read()
[文档]def check_and_delete(fp, wait=0): """ 检查并删除文件/文件夹 :param fp: 文件路径 """ def run(): if wait > 0: time.sleep(wait) if isinstance(fp, str) and os.path.exists(fp): if os.path.isfile(fp): os.remove(fp) else: shutil.rmtree(fp) thread.start_new_thread(run, ())
[文档]def write_temp_file(data, suffix, mode='w+b'): """ 写入临时文件 :param data: 数据 :param suffix: 后缀名 :param mode: 写入模式,默认为 w+b :returns: 文件保存后的路径 """ with tempfile.NamedTemporaryFile(mode=mode, suffix=suffix, delete=False) as f: f.write(data) tmpfile = f.name return tmpfile
[文档]def get_pcm_from_wav(wav_path): """ 从 wav 文件中读取 pcm :param wav_path: wav 文件路径 :returns: pcm 数据 """ wav = wave.open(wav_path, 'rb') return wav.readframes(wav.getnframes())
[文档]def convert_wav_to_mp3(wav_path): """ 将 wav 文件转成 mp3 :param wav_path: wav 文件路径 :returns: mp3 文件路径 """ if not os.path.exists(wav_path): logger.critical("文件错误 {}".format(wav_path)) return None mp3_path = wav_path.replace('.wav', '.mp3') AudioSegment.from_wav(wav_path).export(mp3_path, format="mp3") return mp3_path
[文档]def convert_mp3_to_wav(mp3_path): """ 将 mp3 文件转成 wav :param mp3_path: mp3 文件路径 :returns: wav 文件路径 """ target = mp3_path.replace(".mp3", ".wav") if not os.path.exists(mp3_path): logger.critical("文件错误 {}".format(mp3_path)) return None AudioSegment.from_mp3(mp3_path).export(target, format="wav") return target
[文档]def clean(): """ 清理垃圾数据 """ temp = constants.TEMP_PATH temp_files = os.listdir(temp) for f in temp_files: if os.path.isfile(os.path.join(temp, f)) and re.match(r'output[\d]*\.wav', os.path.basename(f)): os.remove(os.path.join(temp, f))
[文档]def is_proper_time(): """ 是否合适时间 """ if do_not_bother == True: return False if not config.has('do_not_bother'): return True bother_profile = config.get('do_not_bother') if not bother_profile['enable']: return True if 'since' not in bother_profile or 'till' not in bother_profile: return True since = bother_profile['since'] till = bother_profile['till'] current = time.localtime(time.time()).tm_hour if till > since: return current not in range(since, till) else: return not (current in range(since, 25) or current in range(-1, till))
[文档]def get_do_not_bother_on_hotword(): """ 打开勿扰模式唤醒词 """ return config.get('/do_not_bother/on_hotword', '悟空别吵.pmdl')
[文档]def get_do_not_bother_off_hotword(): """ 关闭勿扰模式唤醒词 """ return config.get('/do_not_bother/off_hotword', '悟空醒醒.pmdl')
[文档]def getTimezone(): """ 获取时区 """ return timezone(config.get('timezone', 'HKT'))
[文档]def getCache(msg): """ 获取缓存的语音 """ md5 = hashlib.md5(msg.encode('utf-8')).hexdigest() mp3_cache = os.path.join(constants.TEMP_PATH, md5 + '.mp3') wav_cache = os.path.join(constants.TEMP_PATH, md5 + '.wav') if os.path.exists(mp3_cache): return mp3_cache elif os.path.exists(wav_cache): return wav_cache return None
[文档]def saveCache(voice, msg): """ 获取缓存的语音 """ foo, ext = os.path.splitext(voice) md5 = hashlib.md5(msg.encode('utf-8')).hexdigest() target = os.path.join(constants.TEMP_PATH, md5+ext) shutil.copyfile(voice, target) return target
[文档]def lruCache(): """ 清理最近未使用的缓存 """ def run(*args): if config.get('/lru_cache/enable', True): days = config.get('/lru_cache/days', 7) subprocess.run('find . -name "*.mp3" -atime +%d -exec rm {} \;' % days, cwd=constants.TEMP_PATH, shell=True) thread.start_new_thread(run, ())