Skip to content
Snippets Groups Projects
Commit f5510dba authored by Louis-Philippe Véronneau's avatar Louis-Philippe Véronneau
Browse files

New upstream version 1.0.3

parent d267575d
No related branches found
No related tags found
No related merge requests found
name: 'Tests'
on:
push:
branches: [ '**' ]
defaults:
run:
shell: bash
jobs:
test-linux:
runs-on: ubuntu-22.04
name: 'Linux - Python'
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10' ]
env:
DISPLAY: :0
PY_MPV_SKIP_TESTS: >-
test_wait_for_property_event_overflow
steps:
- uses: actions/checkout@v3
- name: 'Install Python'
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: 'Update Packages'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute sudo apt update -y
execute sudo apt upgrade -y
- name: 'Install Dependencies'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute sudo apt install -y libmpv1 xvfb
- name: 'Start Xvfb'
run: |
echo -e "\033[0;34msudo /usr/bin/Xvfb $DISPLAY -screen 0 1920x1080x24 &\033[0m";
sudo /usr/bin/Xvfb $DISPLAY -screen 0 1920x1080x24 &
- name: 'Setup Test Environment'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute python -m venv venv
execute source venv/bin/activate
execute python -m pip install --upgrade pip
execute python -m pip install wheel
execute python -m pip install -r tests/requirements.txt
- name: 'Run Python Tests'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute source venv/bin/activate
execute xvfb-run python -m pytest
test-windows:
runs-on: windows-latest
name: 'Windows - Python'
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10' ]
steps:
- uses: actions/checkout@v3
- name: 'Install Python'
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: 'Provide libmpv'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
ARTIFACT="mpv-dev-x86_64-20220619-git-c1a46ec.7z"
URL="https://sourceforge.net/projects/mpv-player-windows/files/libmpv/$ARTIFACT"
execute curl -L -O "$URL"
execute 7z x "$ARTIFACT"
execute mv mpv-2.dll tests
- name: 'Setup Test Environment'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute python -m venv venv
execute source venv/Scripts/activate
execute python -m pip install --upgrade pip
execute python -m pip install wheel
execute python -m pip install -r tests/requirements.txt
- name: 'Run Python Tests'
run: |
function execute() { echo -e "\033[0;34m$*\033[0m"; "$@"; }
execute source venv/Scripts/activate
execute python -m pytest
......@@ -11,7 +11,7 @@ Installation
.. code:: bash
pip install python-mpv
pip install mpv
...though you can also realistically just copy `mpv.py`_ into your project as it's all nicely contained in one file.
......@@ -34,7 +34,7 @@ into ctypes, which is different to the one Windows uses internally. Consult `thi
Python >= 3.7 (officially)
..........................
The ``master`` branch officially only supports recent python releases (3.5 onwards), but there is the somewhat outdated
The ``main`` branch officially only supports recent python releases (3.5 onwards), but there is the somewhat outdated
but functional `py2compat branch`_ providing Python 2 compatibility.
.. _`py2compat branch`: https://github.com/jaseg/python-mpv/tree/py2compat
......@@ -64,7 +64,7 @@ Usage
player.play('https://youtu.be/DOmdB7D-pUU')
player.wait_for_playback()
python-mpv mostly exposes mpv's built-in API to python, adding only some porcelain on top. Most "`input commands <https://mpv.io/manual/master/#list-of-input-commands>`_" are mapped to methods of the MPV class. Check out these methods and their docstrings in `the source <https://github.com/jaseg/python-mpv/blob/master/mpv.py>`__ for things you can do. Additional controls and status information are exposed through `MPV properties <https://mpv.io/manual/master/#properties>`_. These can be accessed like ``player.metadata``, ``player.fullscreen`` and ``player.loop_playlist``.
python-mpv mostly exposes mpv's built-in API to python, adding only some porcelain on top. Most "`input commands <https://mpv.io/manual/master/#list-of-input-commands>`_" are mapped to methods of the MPV class. Check out these methods and their docstrings in `the source <https://github.com/jaseg/python-mpv/blob/main/mpv.py>`__ for things you can do. Additional controls and status information are exposed through `MPV properties <https://mpv.io/manual/master/#properties>`_. These can be accessed like ``player.metadata``, ``player.fullscreen`` and ``player.loop_playlist``.
Threading
~~~~~~~~~
......@@ -241,7 +241,8 @@ The easiest way to load custom subtitles from a file is to pass the ``--sub-file
import mpv
player = mpv.MPV()
player.play('test.webm', sub_file='test.srt')
player.loadfile('test.webm', sub_file='test.srt')
player.wait_for_playback()
Note that you can also pass many other options to ``loadfile``. See the mpv docs for details.
......@@ -395,7 +396,7 @@ python-mpv inherits the underlying libmpv's license, which can be either GPLv2 o
For details, see `the mpv copyright page`_.
.. _`PEP 8`: https://www.python.org/dev/peps/pep-0008/
.. _`mpv.py`: https://raw.githubusercontent.com/jaseg/python-mpv/master/mpv.py
.. _`mpv.py`: https://raw.githubusercontent.com/jaseg/python-mpv/main/mpv.py
.. _cosven: https://github.com/cosven
.. _Robozman: https://gitlab.com/robozman
.. _dfaker: https://github.com/dfaker
......
......@@ -10,8 +10,8 @@ if [ -n "$(git diff --name-only --cached)" ]; then
exit 2
fi
sed -i "s/^\\(\\s*version\\s*=\\s*['\"]\\)[^'\"]*\\(['\"]\\s*,\\s*\\)$/\\1"$VER"\\2/" setup.py
git add setup.py
sed -i "s/^\\(\\s*version\\s*=\\s*['\"]\\)[^'\"]*\\(['\"]\\s*\\)$/\\1"$VER"\\2/" pyproject.toml
git add pyproject.toml
git commit -m "Version $VER" --no-edit
env QUBES_GPG_DOMAIN=gpg git -c gpg.program=qubes-gpg-client -c user.signingkey=E36F75307F0A0EC2D145FF5CED7A208EEEC76F2D -c user.email=python-mpv@jaseg.de tag -s "v$VER" -m "Version $VER"
git -c user.signingkey=E36F75307F0A0EC2D145FF5CED7A208EEEC76F2D -c user.email=python-mpv@jaseg.de tag -s "v$VER" -m "Version $VER"
git push --tags origin
......@@ -4,20 +4,23 @@
# Python MPV library module
# Copyright (C) 2017-2022 Sebastian Götte <code@jaseg.net>
#
# This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later
# version.
# python-mpv inherits the underlying libmpv's license, which can be either GPLv2 or later (default) or LGPLv2.1 or
# later. For details, see the mpv copyright page here: https://github.com/mpv-player/mpv/blob/master/Copyright
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You may copy, modify, and redistribute this file under the terms of the GNU General Public License version 2 (or, at
# your option, any later version), or the GNU Lesser General Public License as published by the Free Software
# Foundation; either version 2.1 of the License, or (at your option) any later version.
#
# You should have received a copy of the GNU General Public License along with this program; if not, write to the Free
# Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License and the GNU
# Lesser General Public License for more details.
#
# You can find copies of the GPLv2 and LGPLv2.1 licenses in the project repository's LICENSE.GPL and LICENSE.LGPL files.
from ctypes import *
import ctypes.util
import threading
import queue
import os
import sys
from warnings import warn
......@@ -292,7 +295,7 @@ class MpvEventID(c_int):
FILE_LOADED, CLIENT_MESSAGE, VIDEO_RECONFIG, AUDIO_RECONFIG, SEEK, PLAYBACK_RESTART, PROPERTY_CHANGE)
def __repr__(self):
return f'<MpvEventID {self.value} (_mpv_event_name(self.value).decode("utf-8"))>'
return f'<MpvEventID {self.value} {_mpv_event_name(self.value).decode("utf-8")}>'
@classmethod
def from_str(kls, s):
......@@ -537,6 +540,11 @@ def _mpv_client_api_version():
ver = backend.mpv_client_api_version()
return ver>>16, ver&0xFFFF
MPV_VERSION = _mpv_client_api_version()
if MPV_VERSION < (1, 108):
ver = '.'.join(str(num) for num in MPV_VERSION)
raise RuntimeError(f"python-mpv requires libmpv with an API version of 1.108 or higher (libmpv >= 0.33), but you have an older version ({ver}).")
backend.mpv_free.argtypes = [c_void_p]
_mpv_free = backend.mpv_free
......@@ -873,6 +881,7 @@ class MPV(object):
self.register_stream_protocol('python', self._python_stream_open)
self._python_streams = {}
self._python_stream_catchall = None
self._exception_futures = set()
self.overlay_ids = set()
self.overlays = {}
if loglevel is not None or log_handler is not None:
......@@ -884,6 +893,20 @@ class MPV(object):
else:
self._event_thread = None
@contextmanager
def _enqueue_exceptions(self):
try:
yield
except Exception as e:
for fut in self._exception_futures:
try:
fut.set_exception(e)
break
except InvalidStateError:
pass
else:
warn(f'Unhandled exception on python-mpv event loop: {e}\n{traceback.format_exc()}', RuntimeWarning)
def _loop(self):
for event in _event_generator(self._event_handle):
try:
......@@ -894,45 +917,51 @@ class MPV(object):
self._core_shutdown = True
for callback in self._event_callbacks:
callback(event)
with self._enqueue_exceptions():
callback(event)
if eid == MpvEventID.PROPERTY_CHANGE:
pc = event.data
name, value, _fmt = pc.name, pc.value, pc.format
for handler in self._property_handlers[name]:
handler(name, value)
with self._enqueue_exceptions():
handler(name, value)
if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None:
ev = event.data
self._log_handler(ev.level, ev.prefix, ev.text)
with self._enqueue_exceptions():
self._log_handler(ev.level, ev.prefix, ev.text)
if eid == MpvEventID.CLIENT_MESSAGE:
# {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16}
target, *args = event.data.args
target = target.decode("utf-8")
if target in self._message_handlers:
self._message_handlers[target](*args)
with self._enqueue_exceptions():
self._message_handlers[target](*args)
if eid == MpvEventID.COMMAND_REPLY:
key = event.reply_userdata
callback = self._command_reply_callbacks.pop(key, None)
if callback:
callback(ErrorCode.exception_for_ec(event.error), event.data)
with self._enqueue_exceptions():
callback(ErrorCode.exception_for_ec(event.error), event.data)
if eid == MpvEventID.QUEUE_OVERFLOW:
# cache list, since error handlers will unregister themselves
for cb in list(self._command_reply_callbacks.values()):
cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None)
with self._enqueue_exceptions():
cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None)
if eid == MpvEventID.SHUTDOWN:
_mpv_destroy(self._event_handle)
for cb in list(self._command_reply_callbacks.values()):
cb(ShutdownError('libmpv core has been shutdown'), None)
with self._enqueue_exceptions():
cb(ShutdownError('libmpv core has been shutdown'), None)
return
except Exception as e:
print('Exception inside python-mpv event loop:', file=sys.stderr)
traceback.print_exc()
warn(f'Unhandled {e} inside python-mpv event loop!\n{traceback.format_exc()}', RuntimeWarning)
@property
def core_shutdown(self):
......@@ -946,35 +975,35 @@ class MPV(object):
if self._core_shutdown:
raise ShutdownError('libmpv core has been shutdown')
def wait_until_paused(self, timeout=None):
def wait_until_paused(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while
waiting."""
self.wait_for_property('core-idle', timeout=timeout)
self.wait_for_property('core-idle', timeout=timeout, catch_errors=catch_errors)
def wait_for_playback(self, timeout=None):
def wait_for_playback(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while
waiting.
"""
self.wait_for_event('end_file', timeout=timeout)
self.wait_for_event('end_file', timeout=timeout, catch_errors=catch_errors)
def wait_until_playing(self, timeout=None):
def wait_until_playing(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while
waiting."""
self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout)
self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout, catch_errors=catch_errors)
def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None):
def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True):
"""Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for
properties such as ``idle_active`` indicating the player is done with regular playback and just idling around.
Raises a ShutdownError when the core is shutdown while waiting.
"""
with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout) as result:
with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout, catch_errors=catch_errors) as result:
pass
return result.result()
def wait_for_shutdown(self, timeout=None):
def wait_for_shutdown(self, timeout=None, catch_errors=True):
'''Wait for core to shutdown (e.g. through quit() or terminate()).'''
try:
self.wait_for_event(None, timeout=timeout)
self.wait_for_event(None, timeout=timeout, catch_errors=catch_errors)
except ShutdownError:
return
......@@ -992,7 +1021,7 @@ class MPV(object):
return shutdown_handler.unregister_mpv_events
@contextmanager
def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None):
def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True):
"""Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See
prepare_and_wait_for_event for usage.
Raises a ShutdownError when the core is shutdown while waiting. Re-raises any errors inside ``cond``.
......@@ -1016,6 +1045,9 @@ class MPV(object):
try:
result.set_running_or_notify_cancel()
if catch_errors:
self._exception_futures.add(result)
yield result
rv = cond(getattr(self, name.replace('-', '_')))
......@@ -1028,18 +1060,19 @@ class MPV(object):
finally:
err_unregister()
self.unobserve_property(name, observer)
self._exception_futures.discard(result)
def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None):
def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True):
"""Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError
if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. Re-raises any error
inside ``cond``.
"""
with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout) as result:
with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout, catch_errors=catch_errors) as result:
pass
return result.result()
@contextmanager
def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None):
def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True):
"""Context manager that waits for the indicated event(s) like wait_for_event after running. If cond is given,
waits until cond(event) is true. Raises a ShutdownError if the core is shutdown while waiting. This also happens
when 'shutdown' is in event_types. Re-raises any error inside ``cond``.
......@@ -1057,7 +1090,6 @@ class MPV(object):
@self.event_callback(*event_types)
def target_handler(evt):
try:
rv = cond(evt)
if rv:
......@@ -1074,13 +1106,18 @@ class MPV(object):
try:
result.set_running_or_notify_cancel()
if catch_errors:
self._exception_futures.add(result)
yield result
self.check_core_alive()
result.result(timeout)
finally:
err_unregister()
target_handler.unregister_mpv_events()
self._exception_futures.discard(result)
def __del__(self):
if self.handle:
......@@ -1385,7 +1422,7 @@ class MPV(object):
"""Mapped mpv print-text command, see man mpv(1)."""
self.command('print-text', text)
def show_text(self, string, duration='-1', level=None):
def show_text(self, string, duration='-1', level=0):
"""Mapped mpv show_text command, see man mpv(1)."""
self.command('show_text', string, duration, level)
......@@ -1478,13 +1515,13 @@ class MPV(object):
function decorator if no handler is given.
To unregister the observer, call either of ``mpv.unobserve_property(name, handler)``,
``mpv.unobserve_all_properties(handler)`` or the handler's ``unregister_mpv_properties`` attribute::
``mpv.unobserve_all_properties(handler)`` or the handler's ``unobserve_mpv_properties`` attribute::
@player.observe_property('volume')
def my_handler(new_volume, *):
print("It's loud!", volume)
@player.property_observer('volume')
def my_handler(property_name, new_volume):
print("It's loud!", new_volume)
my_handler.unregister_mpv_properties()
my_handler.unobserve_mpv_properties()
exit_handler is a function taking no arguments that is called when the underlying mpv handle is terminated (e.g.
from calling MPV.terminate() or issuing a "quit" input command).
......@@ -1765,32 +1802,68 @@ class MPV(object):
frontend = open_fn(uri.decode('utf-8'))
except ValueError:
return ErrorCode.LOADING_FAILED
except Exception as e:
for fut in self._exception_futures:
try:
fut.set_exception(e)
break
except InvalidStateError:
pass
else:
warnings.warn(f'Unhandled exception {e} inside stream open callback for URI {uri}\n{traceback.format_exc()}')
def read_backend(_userdata, buf, bufsize):
data = frontend.read(bufsize)
for i in range(len(data)):
buf[i] = data[i]
return len(data)
return ErrorCode.LOADING_FAILED
cb_info.contents.cookie = None
def read_backend(_userdata, buf, bufsize):
with self._enqueue_exceptions():
data = frontend.read(bufsize)
for i in range(len(data)):
buf[i] = data[i]
return len(data)
return -1
read = cb_info.contents.read = StreamReadFn(read_backend)
close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close())
def close_backend(_userdata):
with self._enqueue_exceptions():
del self._stream_protocol_frontends[proto][uri]
if hasattr(frontend, 'close'):
frontend.close()
close = cb_info.contents.close = StreamCloseFn(close_backend)
seek, size, cancel = None, None, None
if hasattr(frontend, 'seek'):
seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx))
def seek_backend(_userdata, offx):
with self._enqueue_exceptions():
return frontend.seek(offx)
return ErrorCode.GENERIC
seek = cb_info.contents.seek = StreamSeekFn(seek_backend)
if hasattr(frontend, 'size') and frontend.size is not None:
size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size)
def size_backend(_userdata):
with self._enqueue_exceptions():
return frontend.size
return 0
size = cb_info.contents.size = StreamSizeFn(size_backend)
if hasattr(frontend, 'cancel'):
cancel = cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel())
def cancel_backend(_userdata):
with self._enqueue_exceptions():
frontend.cancel()
cancel = cb_info.contents.cancel = StreamCancelFn(cancel_backend)
# keep frontend and callbacks in memory forever (TODO)
# keep frontend and callbacks in memory until closed
frontend._registered_callbacks = [read, close, seek, size, cancel]
self._stream_protocol_frontends[proto][uri] = frontend
return 0
if proto in self._stream_protocol_cbs:
raise KeyError('Stream protocol already registered')
# keep backend in memory forever
self._stream_protocol_cbs[proto] = [open_backend]
_mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend)
......
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
py-modules = ['mpv']
[project]
name = "mpv"
version = "v1.0.3"
description = "A python interface to the mpv media player"
readme = "README.rst"
authors = [{name = "jaseg", email = "mpv@jaseg.de"}]
license = {text = "GPLv2+ or LGPLv2.1+"}
requires-python = ">=3.7"
keywords = ['mpv', 'library', 'video', 'audio', 'player', 'display', 'multimedia']
classifiers = [
'Development Status :: 5 - Production/Stable',
'Environment :: X11 Applications',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
'Natural Language :: English',
'Operating System :: POSIX',
'Programming Language :: C',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Multimedia :: Sound/Audio :: Players',
'Topic :: Multimedia :: Video :: Display'
]
[project.urls]
homepage = "https://github.com/jaseg/python-mpv"
[project.optional-dependencies]
screenshot_raw = ["Pillow"]
test = ['xvfbwrapper']
[metadata]
description-file = README.rst
#!/usr/bin/env python3
from setuptools import setup
from pathlib import Path
setup(
name = 'python-mpv',
version = '1.0.1',
py_modules = ['mpv'],
description = 'A python interface to the mpv media player',
long_description = (Path(__file__).parent / 'README.rst').read_text(),
long_description_content_type = 'text/x-rst',
url = 'https://github.com/jaseg/python-mpv',
author = 'jaseg',
author_email = 'github@jaseg.net',
license = 'GPLv3+',
extras_require = {
'screenshot_raw': ['Pillow']
},
tests_require = ['xvfbwrapper'],
test_suite = 'tests',
keywords = ['mpv', 'library', 'video', 'audio', 'player', 'display',
'multimedia'],
python_requires='>=3.7',
classifiers = [
'Development Status :: 5 - Production/Stable',
'Environment :: X11 Applications',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
'Natural Language :: English',
'Operating System :: POSIX',
'Programming Language :: C',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Multimedia :: Sound/Audio :: Players',
'Topic :: Multimedia :: Video :: Display']
)
xvfbwrapper>=0.2.9
pytest>=7.1.2
......@@ -18,18 +18,11 @@
import unittest
from unittest import mock
import math
import threading
from contextlib import contextmanager
from functools import wraps
import gc
import os.path
import os
import sys
import time
import io
import platform
import ctypes
from concurrent.futures import Future
os.environ["PATH"] = os.path.dirname(__file__) + os.pathsep + os.environ["PATH"]
......@@ -49,6 +42,8 @@ else:
TESTVID = os.path.join(os.path.dirname(__file__), 'test.webm')
TESTSRT = os.path.join(os.path.dirname(__file__), 'sub_test.srt')
MPV_ERRORS = [ l(ec) for ec, l in mpv.ErrorCode.EXCEPTION_DICT.items() if l ]
SKIP_TESTS = os.environ.get('PY_MPV_SKIP_TESTS', '').split()
def timed_print():
start_time = time.time()
......@@ -56,6 +51,7 @@ def timed_print():
td = time.time() - start_time
print('{:.3f} [{}] {}: {}'.format(td, level, prefix, text), flush=True)
class MpvTestCase(unittest.TestCase):
def setUp(self):
self.disp = Xvfb()
......@@ -66,6 +62,7 @@ class MpvTestCase(unittest.TestCase):
self.m.terminate()
self.disp.stop()
class TestProperties(MpvTestCase):
@contextmanager
def swallow_mpv_errors(self, exception_exceptions=[]):
......@@ -295,6 +292,7 @@ class ObservePropertyTest(MpvTestCase):
mock.call('slang', ['ru'])],
any_order=True)
class KeyBindingTest(MpvTestCase):
def test_register_direct_cmd(self):
self.m.register_key_binding('a', 'playlist-clear')
......@@ -441,6 +439,7 @@ class KeyBindingTest(MpvTestCase):
handler1.assert_has_calls([])
handler2.assert_has_calls([ mock.call() ])
class TestStreams(unittest.TestCase):
def test_python_stream(self):
handler = mock.Mock()
......@@ -536,6 +535,116 @@ class TestStreams(unittest.TestCase):
m.terminate()
disp.stop()
def test_stream_open_exception(self):
disp = Xvfb()
disp.start()
m = mpv.MPV(vo=testvo, video=False)
@m.register_stream_protocol('raiseerror')
def open_fn(uri):
raise SystemError()
waiting = threading.Semaphore()
result = Future()
def run():
result.set_running_or_notify_cancel()
try:
waiting.release()
m.wait_for_playback()
result.set_result(False)
except SystemError:
result.set_result(True)
except Exception:
result.set_result(False)
t = threading.Thread(target=run, daemon=True)
t.start()
with waiting:
time.sleep(0.2)
m.play('raiseerror://foo')
m.wait_for_playback(catch_errors=False)
try:
assert result.result()
finally:
m.terminate()
disp.stop()
def test_python_stream_exception(self):
disp = Xvfb()
disp.start()
m = mpv.MPV(vo=testvo)
@m.python_stream('foo')
def foo_gen():
with open(TESTVID, 'rb') as f:
yield f.read(100)
raise SystemError()
waiting = threading.Semaphore()
result = Future()
def run():
result.set_running_or_notify_cancel()
try:
waiting.release()
m.wait_for_playback()
result.set_result(False)
except SystemError:
result.set_result(True)
except Exception:
result.set_result(False)
t = threading.Thread(target=run, daemon=True)
t.start()
with waiting:
time.sleep(0.2)
m.play('python://foo')
m.wait_for_playback(catch_errors=False)
try:
assert result.result()
finally:
m.terminate()
disp.stop()
def test_stream_open_forward(self):
disp = Xvfb()
disp.start()
m = mpv.MPV(vo=testvo, video=False)
@m.register_stream_protocol('raiseerror')
def open_fn(uri):
raise ValueError()
waiting = threading.Semaphore()
result = Future()
def run():
result.set_running_or_notify_cancel()
try:
waiting.release()
m.wait_for_playback()
result.set_result(True)
except Exception:
result.set_result(False)
t = threading.Thread(target=run, daemon=True)
t.start()
with waiting:
time.sleep(0.2)
m.play('raiseerror://foo')
m.wait_for_playback(catch_errors=False)
try:
assert result.result()
finally:
m.terminate()
disp.stop()
class TestLifecycle(unittest.TestCase):
def test_create_destroy(self):
thread_names = lambda: [ t.name for t in threading.enumerate() ]
......@@ -577,8 +686,8 @@ class TestLifecycle(unittest.TestCase):
mock.call({'event': 'start-file', 'playlist_entry_id': 1}),
mock.call({'event': 'end-file', 'reason': 'error', 'playlist_entry_id': 1, 'file_error': 'no audio or video data played'})
], any_order=True)
time.sleep(1)
handler.reset_mock()
m.terminate()
handler.assert_not_called()
......@@ -600,6 +709,7 @@ class TestLifecycle(unittest.TestCase):
t.start()
time.sleep(1)
m.terminate()
time.sleep(1)
t.join()
self.disp.stop()
assert result.result()
......@@ -620,13 +730,13 @@ class TestLifecycle(unittest.TestCase):
m.mute = True
t.join()
m.terminate()
time.sleep(1)
handler.assert_called()
self.disp.stop()
def test_wait_for_event(self):
self.disp = Xvfb()
self.disp.start()
handler = mock.Mock()
m = mpv.MPV(vo=testvo)
m.play(TESTVID)
result = Future()
......@@ -649,7 +759,6 @@ class TestLifecycle(unittest.TestCase):
def test_wait_for_property_shutdown(self):
self.disp = Xvfb()
self.disp.start()
handler = mock.Mock()
m = mpv.MPV(vo=testvo)
m.play(TESTVID)
with self.assertRaises(mpv.ShutdownError):
......@@ -657,12 +766,13 @@ class TestLifecycle(unittest.TestCase):
# handle
with m.prepare_and_wait_for_property('mute', level_sensitive=False):
m.terminate()
time.sleep(1)
self.disp.stop()
def test_wait_for_prooperty_event_overflow(self):
@unittest.skipIf('test_wait_for_property_event_overflow' in SKIP_TESTS, reason="kills X-Server first")
def test_wait_for_property_event_overflow(self):
self.disp = Xvfb()
self.disp.start()
handler = mock.Mock()
m = mpv.MPV(vo=testvo)
m.play(TESTVID)
with self.assertRaises(mpv.EventOverflowError):
......@@ -677,10 +787,10 @@ class TestLifecycle(unittest.TestCase):
m.command_async('script-message', 'foo', 'bar')
except:
pass
m.terminate()
time.sleep(1)
self.disp.stop()
def test_wait_for_event_shutdown(self):
self.disp = Xvfb()
self.disp.start()
......@@ -694,7 +804,6 @@ class TestLifecycle(unittest.TestCase):
def test_wait_for_shutdown(self):
self.disp = Xvfb()
self.disp.start()
handler = mock.Mock()
m = mpv.MPV(vo=testvo)
m.play(TESTVID)
with self.assertRaises(mpv.ShutdownError):
......@@ -766,7 +875,6 @@ class CommandTests(MpvTestCase):
callback.assert_any_call(None, None)
class RegressionTests(MpvTestCase):
def test_unobserve_property_runtime_error(self):
......@@ -820,7 +928,3 @@ class RegressionTests(MpvTestCase):
m.slang = 'ru'
m.terminate() # needed for synchronization of event thread
handler.assert_has_calls([mock.call('slang', ['jp']), mock.call('slang', ['ru'])])
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment