Source code for pyppeteer.execution_context

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Execution Context Module."""

import logging
import math
import re
from typing import Any, Dict, Optional, TYPE_CHECKING

from pyppeteer import helper
from pyppeteer.connection import CDPSession
from pyppeteer.errors import ElementHandleError, NetworkError
from pyppeteer.helper import debugError

if TYPE_CHECKING:
    from pyppeteer.element_handle import ElementHandle  # noqa: F401
    from pyppeteer.frame_manager import Frame  # noqa: F401

logger = logging.getLogger(__name__)

EVALUATION_SCRIPT_URL = '__pyppeteer_evaluation_script__'
SOURCE_URL_REGEX = re.compile(
    r'^[\040\t]*//[@#] sourceURL=\s*(\S*?)\s*$',
    re.MULTILINE,
)


[docs]class ExecutionContext(object): """Execution Context class.""" def __init__(self, client: CDPSession, contextPayload: Dict, objectHandleFactory: Any, frame: 'Frame' = None) -> None: self._client = client self._frame = frame self._contextId = contextPayload.get('id') auxData = contextPayload.get('auxData', {'isDefault': True}) self._frameId = auxData.get('frameId', None) self._isDefault = bool(auxData.get('isDefault')) self._objectHandleFactory = objectHandleFactory @property def frame(self) -> Optional['Frame']: """Return frame associated with this execution context.""" return self._frame
[docs] async def evaluate(self, pageFunction: str, *args: Any, force_expr: bool = False) -> Any: """Execute ``pageFunction`` on this context. Details see :meth:`pyppeteer.page.Page.evaluate`. """ handle = await self.evaluateHandle( pageFunction, *args, force_expr=force_expr) try: result = await handle.jsonValue() except NetworkError as e: if 'Object reference chain is too long' in e.args[0]: return if 'Object couldn\'t be returned by value' in e.args[0]: return raise await handle.dispose() return result
[docs] async def evaluateHandle(self, pageFunction: str, *args: Any, # noqa: C901 force_expr: bool = False) -> 'JSHandle': """Execute ``pageFunction`` on this context. Details see :meth:`pyppeteer.page.Page.evaluateHandle`. """ suffix = f'//# sourceURL={EVALUATION_SCRIPT_URL}' if force_expr or (not args and not helper.is_jsfunc(pageFunction)): try: if SOURCE_URL_REGEX.match(pageFunction): expressionWithSourceUrl = pageFunction else: expressionWithSourceUrl = f'{pageFunction}\n{suffix}' _obj = await self._client.send('Runtime.evaluate', { 'expression': expressionWithSourceUrl, 'contextId': self._contextId, 'returnByValue': False, 'awaitPromise': True, 'userGesture': True, }) except Exception as e: _rewriteError(e) exceptionDetails = _obj.get('exceptionDetails') if exceptionDetails: raise ElementHandleError( 'Evaluation failed: {}'.format( helper.getExceptionMessage(exceptionDetails))) remoteObject = _obj.get('result') return self._objectHandleFactory(remoteObject) try: _obj = await self._client.send('Runtime.callFunctionOn', { 'functionDeclaration': f'{pageFunction}\n{suffix}\n', 'executionContextId': self._contextId, 'arguments': [self._convertArgument(arg) for arg in args], 'returnByValue': False, 'awaitPromise': True, 'userGesture': True, }) except Exception as e: _rewriteError(e) exceptionDetails = _obj.get('exceptionDetails') if exceptionDetails: raise ElementHandleError('Evaluation failed: {}'.format( helper.getExceptionMessage(exceptionDetails))) remoteObject = _obj.get('result') return self._objectHandleFactory(remoteObject)
def _convertArgument(self, arg: Any) -> Dict: # noqa: C901 if arg == math.inf: return {'unserializableValue': 'Infinity'} if arg == -math.inf: return {'unserializableValue': '-Infinity'} objectHandle = arg if isinstance(arg, JSHandle) else None if objectHandle: if objectHandle._context != self: raise ElementHandleError('JSHandles can be evaluated only in the context they were created!') # noqa: E501 if objectHandle._disposed: raise ElementHandleError('JSHandle is disposed!') if objectHandle._remoteObject.get('unserializableValue'): return {'unserializableValue': objectHandle._remoteObject.get('unserializableValue')} # noqa: E501 if not objectHandle._remoteObject.get('objectId'): return {'value': objectHandle._remoteObject.get('value')} return {'objectId': objectHandle._remoteObject.get('objectId')} return {'value': arg}
[docs] async def queryObjects(self, prototypeHandle: 'JSHandle') -> 'JSHandle': """Send query. Details see :meth:`pyppeteer.page.Page.queryObjects`. """ if prototypeHandle._disposed: raise ElementHandleError('Prototype JSHandle is disposed!') if not prototypeHandle._remoteObject.get('objectId'): raise ElementHandleError( 'Prototype JSHandle must not be referencing primitive value') response = await self._client.send('Runtime.queryObjects', { 'prototypeObjectId': prototypeHandle._remoteObject['objectId'], }) return self._objectHandleFactory(response.get('objects'))
[docs]class JSHandle(object): """JSHandle class. JSHandle represents an in-page JavaScript object. JSHandle can be created with the :meth:`~pyppeteer.page.Page.evaluateHandle` method. """ def __init__(self, context: ExecutionContext, client: CDPSession, remoteObject: Dict) -> None: self._context = context self._client = client self._remoteObject = remoteObject self._disposed = False @property def executionContext(self) -> ExecutionContext: """Get execution context of this handle.""" return self._context
[docs] async def getProperty(self, propertyName: str) -> 'JSHandle': """Get property value of ``propertyName``.""" objectHandle = await self._context.evaluateHandle( '''(object, propertyName) => { const result = {__proto__: null}; result[propertyName] = object[propertyName]; return result; }''', self, propertyName) properties = await objectHandle.getProperties() result = properties[propertyName] await objectHandle.dispose() return result
[docs] async def getProperties(self) -> Dict[str, 'JSHandle']: """Get all properties of this handle.""" response = await self._client.send('Runtime.getProperties', { 'objectId': self._remoteObject.get('objectId', ''), 'ownProperties': True, }) result = dict() for prop in response['result']: if not prop.get('enumerable'): continue result[prop.get('name')] = self._context._objectHandleFactory( prop.get('value')) return result
[docs] async def jsonValue(self) -> Dict: """Get Jsonized value of this object.""" objectId = self._remoteObject.get('objectId') if objectId: response = await self._client.send('Runtime.callFunctionOn', { 'functionDeclaration': 'function() { return this; }', 'objectId': objectId, 'returnByValue': True, 'awaitPromise': True, }) return helper.valueFromRemoteObject(response['result']) return helper.valueFromRemoteObject(self._remoteObject)
[docs] def asElement(self) -> Optional['ElementHandle']: """Return either null or the object handle itself.""" return None
[docs] async def dispose(self) -> None: """Stop referencing the handle.""" if self._disposed: return self._disposed = True try: await helper.releaseObject(self._client, self._remoteObject) except Exception as e: debugError(logger, e)
[docs] def toString(self) -> str: """Get string representation.""" if self._remoteObject.get('objectId'): _type = (self._remoteObject.get('subtype') or self._remoteObject.get('type')) return f'JSHandle@{_type}' return 'JSHandle:{}'.format( helper.valueFromRemoteObject(self._remoteObject))
def _rewriteError(error: Exception) -> None: if error.args[0].endswith('Cannot find context with specified id'): msg = 'Execution context was destroyed, most likely because of a navigation.' # noqa: E501 raise type(error)(msg) raise error