「利用者:夜泣き/スクリプト」の版間の差分

>Fet-Fe
(→‎コード: v4.1.3 reCAPTCHAでbot検知された場合に自動で再起動する)
>Fet-Fe
4行目: 4行目:
‎<syntaxhighlight lang="python3" line>
‎<syntaxhighlight lang="python3" line>
#!/usr/bin/env python3
#!/usr/bin/env python3
# © 2022 恒心教 (Koushinism)
# Released under the MIT license
# https://opensource.org/licenses/mit-license.php


"""Twitter自動収集スクリプト
"""Twitter自動収集スクリプト


ver4.1.3 2023/10/22恒心
ver4.1.4 2023/10/29恒心


当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
61行目: 65行目:
import subprocess
import subprocess
import sys
import sys
import warnings
from abc import ABCMeta, abstractmethod
from argparse import ArgumentParser, Namespace
from argparse import ArgumentParser, Namespace
from collections.abc import Callable
from collections.abc import Callable
70行目: 74行目:
from time import sleep
from time import sleep
from types import MappingProxyType, TracebackType
from types import MappingProxyType, TracebackType
from typing import Final, NamedTuple, NoReturn, Self, override
from typing import (Final, NamedTuple, NoReturn, Self, assert_never, final,
                    override)
from urllib.parse import quote, unquote, urljoin
from urllib.parse import quote, unquote, urljoin
from zoneinfo import ZoneInfo
from zoneinfo import ZoneInfo
88行目: 93行目:
logger: Final[Logger] = getLogger(__name__)
logger: Final[Logger] = getLogger(__name__)
logger.setLevel(logging.INFO)  # basicConfigで設定するとモジュールのDEBUGログなども出力される
logger.setLevel(logging.INFO)  # basicConfigで設定するとモジュールのDEBUGログなども出力される
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化
warnings.simplefilter('ignore')




105行目: 107行目:




class AbstractAccessor:
class AbstractAccessor(metaclass=ABCMeta):
     """HTTPリクエストでWebサイトに接続するための基底クラス。
     """HTTPリクエストでWebサイトに接続するための基底クラス。
     """
     """
125行目: 127行目:
     """
     """


    @abstractmethod
    def get(self, url: str) -> str:
        """URLにアクセスして、HTMLを取得する。
        Args:
            url (str): 接続するURL。
        Raises:
            AccessError: 通信に失敗した場合のエラー。
        Returns:
            str: レスポンスのHTML。
        """
        ...
    @final
     def _random_sleep(self) -> None:
     def _random_sleep(self) -> None:
         """ランダムな秒数スリープする。
         """ランダムな秒数スリープする。
162行目: 180行目:
     """
     """


     def __init__(self) -> None:
     def __init__(self):
         """コンストラクタ。
         """コンストラクタ。
         """
         """
         self._proxies: Final[dict[str, str] | None] = (
         self._proxies: dict[str, str] | None = (
             self._choose_tor_proxies()
             self._choose_tor_proxies()
         )  # Torに必要なプロキシをセット
         )  # Torに必要なプロキシをセット


     def _execute(self,
     def _execute(self,
                 url: str,
                 url: str) -> requests.models.Response:
                proxies: dict[str, str] | None) -> requests.models.Response:
         """引数のURLにRequestsモジュールでHTTP接続する。
         """引数のURLにRequestsモジュールでHTTP接続する。


         Args:
         Args:
             url (str): 接続するURL。
             url (str): 接続するURL。
            proxies (dict[str, str] | None): 接続に利用するプロキシ。
                デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。


         Raises:
         Raises:
             requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
             requests.exceptions.HTTPError: ステータスコードが200でない場合のエラー。
            AccessError: ステータスコードが200でない場合のエラー。


         Returns:
         Returns:
187行目: 201行目:
         """
         """
         sleep(self.WAIT_TIME)  # DoS対策で待つ
         sleep(self.WAIT_TIME)  # DoS対策で待つ
        res: Final[requests.models.Response] = requests.get(
            url,
            timeout=self.REQUEST_TIMEOUT,
            headers=self.HEADERS,
            allow_redirects=False,
            proxies=getattr(self, '_proxies', None))
        res.raise_for_status()  # HTTPステータスコードが200番台以外でエラー発生
        return res
    @override
    def get(self, url: str) -> str:
         try:
         try:
             res: Final[requests.models.Response] = requests.get(
             return self._execute(url).text
                url,
                timeout=self.REQUEST_TIMEOUT,
                headers=self.HEADERS,
                allow_redirects=False,
                proxies=proxies if proxies is not None
                else getattr(self, '_proxies', None))
            res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生
            return res
        except requests.exceptions.ConnectionError:
            raise
         except requests.exceptions.RequestException as e:
         except requests.exceptions.RequestException as e:
            # requestsモジュール固有の例外を共通の例外に変換
             raise AccessError from e
             raise AccessError(str(e)) from e
 
    def get(self,
            url: str,
            proxies: dict[str, str] | None = None) -> str:
        """引数のURLにRequestsモジュールでHTTP接続する。
 
        Args:
            url (str): 接続するURL。
            proxies (dict[str, str] | None, optional): 接続に利用するプロキシ。
                デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。
 
        Raises:
            requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
            AccessError: ステータスコードが200でない場合のエラー。
 
        Returns:
            str: レスポンスのHTML。
        """
        try:
            return self._execute(url, proxies).text
        except (requests.exceptions.ConnectionError, AccessError):
            raise


     def get_image(self, url: str) -> bytes | None:
     def get_image(self, url: str) -> bytes | None:
232行目: 224行目:


         Raises:
         Raises:
            requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
             AccessError: アクセスに失敗した場合のエラー。
             AccessError: ステータスコードが200でない場合のエラー。


         Returns:
         Returns:
239行目: 230行目:
         """
         """
         try:
         try:
             res: Final[requests.models.Response] = self._execute(url,
             res: Final[requests.models.Response] = self._execute(url)
                                                                self._proxies)
         except requests.exceptions.RequestException as e:
         except (requests.exceptions.ConnectionError, AccessError):
             raise AccessError from e
             raise


         if 'image' in res.headers['content-type']:
         if 'image' in res.headers['content-type']:
265行目: 255行目:
         logger.info('Torのチェック中ですを')
         logger.info('Torのチェック中ですを')
         # プロキシなしでTorにアクセスできるかどうか
         # プロキシなしでTorにアクセスできるかどうか
         res: str = self.get(self.TOR_CHECK_URL, proxies=None)
        self._proxies = None
         res: str = self._execute(self.TOR_CHECK_URL).text
         is_tor: bool = json.loads(res)['IsTor']
         is_tor: bool = json.loads(res)['IsTor']
         if is_tor:
         if is_tor:
273行目: 264行目:
         # Tor BrowserのプロキシでTorにアクセスできるかどうか
         # Tor BrowserのプロキシでTorにアクセスできるかどうか
         try:
         try:
             res = self.get(self.TOR_CHECK_URL,
            self._proxies = self.PROXIES_WITH_BROWSER
                          proxies=self.PROXIES_WITH_BROWSER)
             res = self._execute(self.TOR_CHECK_URL).text
             is_tor = json.loads(res)['IsTor']
             is_tor = json.loads(res)['IsTor']
             if is_tor:
             if is_tor:
284行目: 275行目:
         # torコマンドのプロキシでTorにアクセスできるかどうか
         # torコマンドのプロキシでTorにアクセスできるかどうか
         try:
         try:
             res = self.get(self.TOR_CHECK_URL,
            self._proxies = self.PROXIES_WITH_COMMAND
                          proxies=self.PROXIES_WITH_COMMAND)
             res = self._execute(self.TOR_CHECK_URL).text
             is_tor = json.loads(res)['IsTor']
             is_tor = json.loads(res)['IsTor']
             if is_tor:
             if is_tor:
327行目: 318行目:
     """
     """


     def __init__(self, enable_javascript: bool) -> None:
     def __init__(self, enable_javascript: bool):
         """コンストラクタ。
         """コンストラクタ。


406行目: 397行目:
                     WebDriverWait(
                     WebDriverWait(
                         self._driver,
                         self._driver,
                         self.WAIT_TIME_FOR_RECAPTCHA).until(  # type: ignore
                         self.WAIT_TIME_FOR_RECAPTCHA
                            ec.visibility_of_element_located(
                    ).until(  # type: ignore
                                # bot検知された場合に現れるクラス
                        ec.visibility_of_element_located(
                                (By.CLASS_NAME, 'rc-doscaptcha-header')
                            # bot検知された場合に現れるクラス
                            )
                            (By.CLASS_NAME, 'rc-doscaptcha-header')
                        )
                     )
                     )
                 except InvalidSwitchToTargetException:
                 except InvalidSwitchToTargetException:
419行目: 411行目:
                 else:
                 else:
                     # waitを普通に抜けた場合
                     # waitを普通に抜けた場合
                     logger.warn('botバレしたなりを')
                     logger.warning('botバレしたなりを')
                     # 一回ブラウザを落として起動し直す
                     # 一回ブラウザを落として起動し直す
                     self.quit()
                     self.quit()
428行目: 420行目:
                     f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}')
                     f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}')


    @override
     def get(self, url: str) -> str:
     def get(self, url: str) -> str:
        """引数のURLにSeleniumでHTTP接続する。
        Args:
            url (str): 接続するURL。
        Raises:
            AccessError: 通信に失敗した場合のエラー。
        Returns:
            str: レスポンスのHTML。
        """
         self._random_sleep()  # DoS対策で待つ
         self._random_sleep()  # DoS対策で待つ
         try:
         try:
446行目: 428行目:
         except WebDriverException as e:
         except WebDriverException as e:
             # Selenium固有の例外を共通の例外に変換
             # Selenium固有の例外を共通の例外に変換
             raise AccessError(str(e)) from e
             raise AccessError from e
         return self._driver.page_source
         return self._driver.page_source


464行目: 446行目:
     """
     """


     def __init__(self, use_browser: bool, enable_javascript: bool) -> None:
     def __init__(self, use_browser: bool, enable_javascript: bool):
         """コンストラクタ。
         """コンストラクタ。


625行目: 607行目:
     """
     """


     def __init__(self, date: datetime) -> None:
     def __init__(self, date: datetime):
         """コンストラクタ。
         """コンストラクタ。


923行目: 905行目:
     """
     """


     def __init__(self) -> None | NoReturn:
     def __init__(self):
         """コンストラクタ。
         """コンストラクタ。
         """
         """
1,306行目: 1,288行目:
                     tweet_media.select('.attachment.video-container')):
                     tweet_media.select('.attachment.video-container')):
                 if not self._has_ffmpeg:
                 if not self._has_ffmpeg:
                     logger.warn(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
                     logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     continue
                     continue
1,341行目: 1,323行目:
                         logger.info(f'{ts_filename}.tsをmp4に変換してアップロードしなければない。')
                         logger.info(f'{ts_filename}.tsをmp4に変換してアップロードしなければない。')
                         media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
                         media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
                     case _:
                     case FfmpegStatus.FAILED:
                         logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                         logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                         media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                         media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                    case _ as unreachable:  # pyright: ignore [reportUnnecessaryComparison]
                        assert_never(unreachable)


             media_txt = ' '.join(media_list)
             media_txt = ' '.join(media_list)
1,562行目: 1,546行目:
         archive_url: str = urljoin(
         archive_url: str = urljoin(
             self.ARCHIVE_TODAY_STANDARD,
             self.ARCHIVE_TODAY_STANDARD,
             quote(
             quote(unquote(url), safe='&=+?%'))
                unquote(url),
                safe='&=+?%'))
         res: Final[str | None] = accessor.request(urljoin(
         res: Final[str | None] = accessor.request(urljoin(
             self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%')))
             self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%')))
1,735行目: 1,717行目:
                 self._invidious_instances(accessor)
                 self._invidious_instances(accessor)
             )
             )
             self._invidious_pattern: re.Pattern[str] = re.compile(
             self._invidious_pattern: Pattern[str] = re.compile(
                 '|'.join(invidious_url_tuple))
                 '|'.join(invidious_url_tuple))


1,765行目: 1,747行目:
     """
     """


     TWEET_URL_PREFIX_DEFAULT: Final[str] = '17'
     TWEET_URL_PREFIX_DEFAULT: Final[str] = '171'
     """Final[str]: ツイートURLの数字部分のうち、予め固定しておく部分。
     """Final[str]: ツイートURLの数字部分のうち、予め固定しておく部分。


1,772行目: 1,754行目:
     """
     """


     INCREMENTED_NUM_DEFAULT: Final[int] = 0
     INCREMENTED_NUM_DEFAULT: Final[int] = 6
     """Final[int]: ツイートURLの数字部分うち、インクリメントする桁のデフォルト値。
     """Final[int]: ツイートURLの数字部分うち、インクリメントする桁のデフォルト値。


1,925行目: 1,907行目:
             self._append_tweet_urls(soup)
             self._append_tweet_urls(soup)
             has_next = self._go_to_next(soup, accessor)
             has_next = self._go_to_next(soup, accessor)
             soup = BeautifulSoup(self._page)
             soup = BeautifulSoup(self._page, 'html.parser')


     def _next_url(
     def _next_url(
2,022行目: 2,004行目:
             page: str | None = accessor.request(url_pair.archive_url)
             page: str | None = accessor.request(url_pair.archive_url)
             assert page is not None
             assert page is not None
             soup: BeautifulSoup = BeautifulSoup(page)
             soup: BeautifulSoup = BeautifulSoup(page, 'html.parser')
             if soup.select_one('span[data-testid="socialContext"]') is None:
             if soup.select_one('span[data-testid="socialContext"]') is None:
                 filtered_urls.append(url_pair)
                 filtered_urls.append(url_pair)
匿名利用者