robot.Player 源代码

# -*- coding: utf-8-*-
import subprocess
import os
import platform
from . import utils
import _thread as thread
from robot import logging
from ctypes import CFUNCTYPE, c_char_p, c_int, cdll
from contextlib import contextmanager

logger = logging.getLogger(__name__)

[文档]def py_error_handler(filename, line, function, err, fmt): pass
ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) c_error_handler = ERROR_HANDLER_FUNC(py_error_handler)
[文档]@contextmanager def no_alsa_error(): try: asound = cdll.LoadLibrary('libasound.so') asound.snd_lib_error_set_handler(c_error_handler) yield asound.snd_lib_error_set_handler(None) except: yield pass
[文档]def play(fname, onCompleted=None): player = getPlayerByFileName(fname) player.play(fname, onCompleted)
[文档]def getPlayerByFileName(fname): foo, ext = os.path.splitext(fname) if ext in ['.mp3', '.wav']: return SoxPlayer()
[文档]class AbstractPlayer(object): def __init__(self, **kwargs): super(AbstractPlayer, self).__init__()
[文档] def play(self): pass
[文档] def play_block(self): pass
[文档] def stop(self): pass
[文档] def is_playing(self): return False
[文档]class SoxPlayer(AbstractPlayer): SLUG = 'SoxPlayer' def __init__(self, **kwargs): super(SoxPlayer, self).__init__(**kwargs) self.playing = False self.proc = None self.delete = False self.onCompleteds = []
[文档] def doPlay(self): cmd = ['play', str(self.src)] logger.debug('Executing %s', ' '.join(cmd)) self.proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) self.playing = True self.proc.wait() self.playing = False if self.delete: utils.check_and_delete(self.src) logger.debug('play completed') if self.proc.returncode == 0: for onCompleted in self.onCompleteds: if onCompleted: onCompleted() self.onCompleteds = []
[文档] def play(self, src, delete=False, onCompleted=None, wait=False): if (os.path.exists(src)): self.src = src self.delete = delete self.onCompleteds.append(onCompleted) if not wait: thread.start_new_thread(self.doPlay, ()) else: self.doPlay() else: logger.critical('path not exists: {}'.format(src))
[文档] def appendOnCompleted(self, onCompleted): if onCompleted: self.onCompleteds.append(onCompleted)
[文档] def play_block(self): self.run()
[文档] def stop(self): if self.proc: self.onCompleteds = [] self.proc.terminate() if self.delete: utils.check_and_delete(self.src)
[文档] def is_playing(self): return self.playing
[文档]class MusicPlayer(SoxPlayer): """ 给音乐播放器插件使用的, 在 SOXPlayer 的基础上增加了列表的支持, 并支持暂停和恢复播放 """ SLUG = 'MusicPlayer' def __init__(self, playlist, plugin, **kwargs): super(MusicPlayer, self).__init__(**kwargs) self.playlist = playlist self.plugin = plugin self.idx = 0 self.pausing = False self.last_paused = None
[文档] def update_playlist(self, playlist): super().stop() self.playlist = playlist self.idx = 0 self.play()
[文档] def play(self): logger.debug('MusicPlayer play') path = self.playlist[self.idx] super().stop() super().play(path, False, self.next)
[文档] def next(self): logger.debug('MusicPlayer next') super().stop() self.idx = (self.idx+1) % len(self.playlist) self.play()
[文档] def prev(self): logger.debug('MusicPlayer prev') super().stop() self.idx = (self.idx-1) % len(self.playlist) self.play()
[文档] def pause(self): logger.debug('MusicPlayer pause') self.pausing = True
[文档] def stop(self): if self.proc: logger.debug('MusicPlayer stop') # STOP current play process self.last_paused = utils.write_temp_file(str(self.proc.pid), 'pid', 'w') self.onCompleteds = [] subprocess.run(['pkill', '-STOP', '-F', self.last_paused])
[文档] def resume(self): logger.debug('MusicPlayer resume') self.pausing = False self.onCompleteds = [self.next] if self.last_paused is not None: print(self.last_paused) subprocess.run(['pkill', '-CONT', '-F', self.last_paused])
[文档] def is_playing(self): return self.playing
[文档] def is_pausing(self): return self.pausing
[文档] def turnUp(self): system = platform.system() if system == 'Darwin': res = subprocess.run(['osascript', '-e', 'output volume of (get volume settings)'], shell=False, capture_output=True, text=True) volume = int(res.stdout.strip()) volume += 20 if volume >= 100: volume = 100 self.plugin.say('音量已经最大啦', wait=True) subprocess.run(['osascript', '-e', 'set volume output volume {}'.format(volume)]) elif system == 'Linux': res = subprocess.run(["amixer sget Master | grep 'Mono:' | awk -F'[][]' '{ print $2 }'"], shell=True, capture_output=True, text=True) print(res.stdout) if res.stdout != '' and res.stdout.strip().endswith('%'): volume = int(res.stdout.strip().replace('%', '')) volume += 20 if volume >= 100: volume = 100 self.plugin.say('音量已经最大啦', wait=True) subprocess.run(['amixer', 'set', 'Master', '{}%'.format(volume)]) else: subprocess.run(['amixer', 'set', 'Master', '20%+']) else: self.plugin.say('当前系统不支持调节音量') self.resume()
[文档] def turnDown(self): system = platform.system() if system == 'Darwin': res = subprocess.run(['osascript', '-e', 'output volume of (get volume settings)'], shell=False, capture_output=True, text=True) volume = int(res.stdout.strip()) volume -= 20 if volume <= 20: volume = 20 self.plugin.say('音量已经很小啦', wait=True) subprocess.run(['osascript', '-e', 'set volume output volume {}'.format(volume)]) elif system == 'Linux': res = subprocess.run(["amixer sget Master | grep 'Mono:' | awk -F'[][]' '{ print $2 }'"], shell=True, capture_output=True, text=True) if res.stdout != '' and res.stdout.endswith('%'): volume = int(res.stdout.replace('%', '').strip()) volume -= 20 if volume <= 20: volume = 20 self.plugin.say('音量已经最小啦', wait=True) subprocess.run(['amixer', 'set', 'Master', '{}%'.format(volume)]) else: subprocess.run(['amixer', 'set', 'Master', '20%-']) else: self.plugin.say('当前系统不支持调节音量') self.resume()