「利用者:夜泣き/スクリプト」の版間の差分

→‎コード: v4.4.2 WikiのURLをkrsw-wiki.inに変更
>Fet-Fe
(→‎コード: v4.3.2 URL一覧に重複が出ないよう修正)
(→‎コード: v4.4.2 WikiのURLをkrsw-wiki.inに変更)
 
(他の1人の利用者による、間の8版が非表示)
11行目: 11行目:
"""Twitter自動収集スクリプト
"""Twitter自動収集スクリプト


ver4.3.2 2024/3/4恒心
ver4.4.2 2024/11/9恒心


当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。
19行目: 19行目:
Examples:
Examples:
     定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。
     定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。
     ::
     ::


24行目: 25行目:


     オプションに ``--krsw`` とつけると自動モードになります。
     オプションに ``--krsw`` とつけると自動モードになります。
     ::
     ::


33行目: 35行目:
     ``--no-browser`` オプションでTor Browserを使用しないモードに、
     ``--no-browser`` オプションでTor Browserを使用しないモードに、
     ``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
     ``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
     ``--search-unarchived`` オプションでは、従来の通りNitterからツイートを収集するモードになります(廃止予定)。
     ``--search-unarchived`` オプションでは、従来の通りNitterからツイートを収集するモードになります (廃止予定)。


Note:
Note:
49行目: 51行目:
     * requests, typing_extensionsも環境によっては入っていないかもしれない
     * requests, typing_extensionsも環境によっては入っていないかもしれない


         * ``$ python3 -m pip install bs4 requests PySocks selenium typing_extensions``
    .. code-block:: bash
 
         $ python3 -m pip install bs4 requests PySocks selenium typing_extensions


     * pipも入っていなければ ``$ sudo apt install pip``
     * pipも入っていなければ ``$ sudo apt install pip``
     * `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます
     * `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます
     * バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \
     * バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \
         <https://krsw-wiki.org/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_
         <https://krsw-wiki.in/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_
"""
"""


import codecs
import json
import json
import logging
import logging
76行目: 79行目:
from datetime import datetime
from datetime import datetime
from enum import Enum
from enum import Enum
from re import Match, Pattern
from pathlib import Path
from time import sleep
from time import sleep
from traceback import TracebackException
from traceback import TracebackException
106行目: 109行目:
     """
     """
     media_dir: Final[str] = 'tweet_media'
     media_dir: Final[str] = 'tweet_media'
     """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。
     """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。"""
    """


     filename: Final[str] = 'tweet.txt'
     filename: Final[str] = 'tweet.txt'
     """Final[str]: ツイートを保存するファイルの名前。
     """Final[str]: ツイートを保存するファイルの名前。"""
    """


     log_file: Final[str] = 'twitter_archiver.log'
     log_file: Final[str] = 'twitter_archiver.log'
     """Final[str]: ログを保存するファイルの名前。
     """Final[str]: ログを保存するファイルの名前。"""
    """


     @dataclass(init=False, eq=False, frozen=True)
     @dataclass(init=False, eq=False, frozen=True)
     class NitterCrawler:
     class NitterCrawler:
         """``--search-unarchived`` オプション有りの時に利用する設定値。
         """``--search-unarchived`` オプション有りの時に利用する設定値。"""
        """
         limit_n_tweets: Final[int] = 100
         limit_n_tweets: Final[int] = 100
         """Final[int]: 取得するツイート数の上限。
         """Final[int]: 取得するツイート数の上限。"""
        """


         report_interval: Final[int] = 5
         report_interval: Final[int] = 5
         """Final[int]: 記録件数を報告するインターバル。
         """Final[int]: 記録件数を報告するインターバル。"""
        """


         nitter_instance: Final[str] = 'https://nitter.moomoo.me/'  # noqa: E501
         nitter_instance: Final[str] = 'https://nitter.poast.org/'  # noqa: E501
         """Final[str]: Nitterのインスタンス。
         """Final[str]: Nitterのインスタンス。


143行目: 140行目:
     @dataclass(init=False, eq=False, frozen=True)
     @dataclass(init=False, eq=False, frozen=True)
     class ArchiveCrawler:
     class ArchiveCrawler:
         """``--search-unarchived`` オプション無しの時に使用する設定値。
         """``--search-unarchived`` オプション無しの時に使用する設定値。"""
        """
         url_list_filename: Final[str] = 'url_list.txt'
         url_list_filename: Final[str] = 'url_list.txt'
         """Final[str]: URLのリストをダンプするファイル名。
         """Final[str]: URLのリストをダンプするファイル名。"""
        """




170行目: 165行目:


class AccessError(Exception):
class AccessError(Exception):
     """RequestsとSeleniumで共通のアクセスエラー。
     """RequestsとSeleniumで共通のアクセスエラー。"""
    """
     pass
     pass




class ReCaptchaRequiredError(Exception):
class ReCaptchaRequiredError(Exception):
     """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。
     """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。"""
    """
     pass
     pass




class AbstractAccessor(metaclass=ABCMeta):
class AbstractAccessor(metaclass=ABCMeta):
     """HTTPリクエストでWebサイトに接続するための基底クラス。
     """HTTPリクエストでWebサイトに接続するための基底クラス。"""
    """


     WAIT_TIME: Final[int] = 1
     WAIT_TIME: Final[int] = 1
198行目: 190行目:


     WAIT_RANGE: Final[int] = 5
     WAIT_RANGE: Final[int] = 5
     """Final[int]: ランダムな時間待機するときの待機時間の幅(秒)。
     """Final[int]: ランダムな時間待機するときの待機時間の幅(秒)。"""
    """


     REQUEST_TIMEOUT: Final[int] = 30
     REQUEST_TIMEOUT: Final[int] = 30
     """Final[int]: HTTPリクエストのタイムアウト秒数。
     """Final[int]: HTTPリクエストのタイムアウト秒数。"""
    """


     @abstractmethod
     @abstractmethod
240行目: 230行目:


class RequestsAccessor(AbstractAccessor):
class RequestsAccessor(AbstractAccessor):
     """RequestsモジュールでWebサイトに接続するためのクラス。
     """RequestsモジュールでWebサイトに接続するためのクラス。"""
    """


     HEADERS: Final[dict[str, str]] = {
     HEADERS: Final[dict[str, str]] = {
247行目: 236行目:
             'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/120.0'  # noqa: E501
             'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/120.0'  # noqa: E501
     }
     }
     """Final[dict[str, str]]: HTTPリクエスト時のヘッダ。
     """Final[dict[str, str]]: HTTPリクエスト時のヘッダ。"""
    """


     PROXIES_WITH_BROWSER: Final[dict[str, str]] = {
     PROXIES_WITH_BROWSER: Final[dict[str, str]] = {
254行目: 242行目:
         'https': 'socks5h://127.0.0.1:9150'
         'https': 'socks5h://127.0.0.1:9150'
     }
     }
     """Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。
     """Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。"""
    """


     PROXIES_WITH_COMMAND: Final[dict[str, str]] = {
     PROXIES_WITH_COMMAND: Final[dict[str, str]] = {
261行目: 248行目:
         'https': 'socks5h://127.0.0.1:9050'
         'https': 'socks5h://127.0.0.1:9050'
     }
     }
     """Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。
     """Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。"""
    """


     TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip'
     TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip'
     """Final[str]: Tor経由で通信しているかチェックするサイトのURL。
     """Final[str]: Tor経由で通信しているかチェックするサイトのURL。"""
    """


     def __init__(self) -> None:
     def __init__(self) -> None:
        """コンストラクタ。
        """
         # Torに必要なプロキシをセット
         # Torに必要なプロキシをセット
         self._cookies: dict[str, str] = {}
         self._cookies: dict[str, str] = {}
375行目: 358行目:
             else:
             else:
                 raise AccessError('サイトにTorのIPでアクセスできていないなりを')
                 raise AccessError('サイトにTorのIPでアクセスできていないなりを')
         except requests.exceptions.ConnectionError as e:
         except requests.exceptions.ConnectionError:
            logger.critical(e)
             logger.critical('通信がTorのSOCKS proxyを経由していないなりを', exc_info=True)
             logger.critical('通信がTorのSOCKS proxyを経由していないなりを')
             sys.exit(1)
             sys.exit(1)


     @property
     @property
     def proxies(self) -> dict[str, str] | None:
     def proxies(self) -> dict[str, str] | None:
         """オブジェクトのプロキシ設定を返す。
         """dict[str, str] | None: プロキシ設定。"""
 
        Returns:
            dict[str, str] | None: プロキシ設定。
        """
         return self._proxies
         return self._proxies


392行目: 370行目:
class SeleniumAccessor(AbstractAccessor):
class SeleniumAccessor(AbstractAccessor):
     """SeleniumでWebサイトに接続するためのクラス。
     """SeleniumでWebサイトに接続するためのクラス。
    Args:
        enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
     """
     """


399行目: 380行目:
         'Linux': '/usr/bin/torbrowser'
         'Linux': '/usr/bin/torbrowser'
     })
     })
     """Final[MappingProxyType[str, str]]: OSごとのTor Browserのパス。
     """Final[MappingProxyType[str, str]]: OSごとのTor Browserのパス。"""
    """


     WEB_DRIVER_WAIT_TIME: Final[int] = 15
     WEB_DRIVER_WAIT_TIME: Final[int] = 15
     """Final[int]: 最初のTor接続時の待機時間(秒)。
     """Final[int]: 最初のTor接続時の待機時間(秒)。"""
    """


     WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000
     WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000
     """Final[int]: reCAPTCHAのための待機時間(秒)。
     """Final[int]: reCAPTCHAのための待機時間(秒)。"""
    """


     def __init__(self, enable_javascript: bool) -> None:
     def __init__(self, enable_javascript: bool) -> None:
        """コンストラクタ。
         self._options: Final[FirefoxOptions] = FirefoxOptions()
 
        Tor Browserを自動操縦するためのSeleniumドライバを初期化する。
 
        Args:
            enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
        """
         self._options: Final[FirefoxOptions] = FirefoxOptions()
         self._options.binary_location = (
         self._options.binary_location = (
             self.TOR_BROWSER_PATHS[platform.system()]
             self.TOR_BROWSER_PATHS[platform.system()]
436行目: 407行目:


     def quit(self) -> None:
     def quit(self) -> None:
         """Seleniumドライバを終了する。
         """Seleniumドライバを終了する。"""
        """
         if hasattr(self, '_driver'):
         if hasattr(self, '_driver'):
             logger.debug('ブラウザ終了')
             logger.debug('ブラウザ終了')
443行目: 413行目:


     def _refresh_browser(self) -> None:
     def _refresh_browser(self) -> None:
         """ブラウザを起動する。
         """ブラウザを起動する。"""
        """
         try:
         try:
             logger.debug('ブラウザ起動')
             logger.debug('ブラウザ起動')
468行目: 437行目:


         Args:
         Args:
             url (str): アクセスしようとしているURL。
             url (str): アクセスしようとしているURL。\
                 reCAPTCHAが要求されると `current_url` が変わることがあるので必要。
                 reCAPTCHAが要求されると `current_url` が変わることがあるので必要。


482行目: 451行目:
                 print('reCAPTCHAを解いてね(笑)、それはできるよね。\a\a\a')
                 print('reCAPTCHAを解いてね(笑)、それはできるよね。\a\a\a')
                 print('botバレしたら自動でブラウザが再起動するナリよ')
                 print('botバレしたら自動でブラウザが再起動するナリよ')
                print('Tips: カーソルを迷ったように動かすとか、人間らしく振る舞うのがコツナリ')
                 WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until(
                 WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until(
                     ec.presence_of_element_located(
                     ec.presence_of_element_located(
522行目: 492行目:
             self._driver.get(url)
             self._driver.get(url)
             WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until_not(
             WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until_not(
                 ec.title_is('Redirecting'))  # Nitterのリダイレクトを検知する
                 ec.any_of(
                    ec.title_is('Redirecting'),
                    ec.title_is('Verifying your browser | Nitter')
                )
            )  # Nitterのリダイレクトを検知する
             self._check_recaptcha(url)
             self._check_recaptcha(url)
         except WebDriverException as e:
         except WebDriverException as e:
             # Selenium固有の例外を共通の例外に変換
             # Selenium固有の例外を共通の例外に変換
             raise AccessError from e
             raise AccessError from e
        sleep(5)
         return self._driver.page_source
         return self._driver.page_source


541行目: 516行目:


     RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。
     RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。
    Args:
        use_browser (bool): `True` ならSeleniumを利用する。`False` ならRequestsのみでアクセスする。
        enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。
     """
     """


     LIMIT_N_REQUESTS: Final[int] = 5
     LIMIT_N_REQUESTS: Final[int] = 5
     """Final[int]: HTTPリクエスト失敗時の再試行回数。
     """Final[int]: HTTPリクエスト失敗時の再試行回数。"""
    """


     WAIT_TIME_FOR_ERROR: Final[int] = 4
     WAIT_TIME_FOR_ERROR: Final[int] = 4
     """Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。
     """Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。"""
    """


     def __init__(self, use_browser: bool, enable_javascript: bool) -> None:
     def __init__(self, use_browser: bool, enable_javascript: bool) -> None:
        """コンストラクタ。
        Requestのみを利用するか、Seleniumも利用するか引数で選択して初期化する。
        Args:
            use_browser (bool): `True` ならSeleniumを利用する。
                `False` ならRequestsのみでアクセスする。
            enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。
        """
         self._selenium_accessor: Final[SeleniumAccessor | None] = (
         self._selenium_accessor: Final[SeleniumAccessor | None] = (
             SeleniumAccessor(enable_javascript) if use_browser else None
             SeleniumAccessor(enable_javascript) if use_browser else None
568行目: 536行目:


     def __enter__(self) -> Self:
     def __enter__(self) -> Self:
        """`with` ブロックの開始時に実行する。
        Returns:
            Self: オブジェクト自身。
        """
         return self
         return self


579行目: 542行目:
                 exc_value: BaseException | None,
                 exc_value: BaseException | None,
                 traceback: TracebackType | None) -> None:
                 traceback: TracebackType | None) -> None:
        """`with` ブロックの終了時に実行する。
        Args:
            exc_type (type[BaseException] | None): コンテキスト内で例外を吐いた場合の例外タイプ。
            exc_value (BaseException | None): コンテキスト内で例外を吐いた場合の例外。
            traceback (TracebackType | None): コンテキスト内で例外を吐いた場合のトレースバック。
        """
         if self._selenium_accessor is not None:
         if self._selenium_accessor is not None:
             self._selenium_accessor.quit()
             self._selenium_accessor.quit()
613行目: 569行目:


     def _request_with_callable[T](
     def _request_with_callable[T](
            self,
        self,
            url: str,
        url: str,
            request_callable: Callable[[str], T]) -> T | None:
        request_callable: Callable[[str], T]
    ) -> T | None:
         """`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
         """`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。


634行目: 591行目:
         for i in range(self.LIMIT_N_REQUESTS):
         for i in range(self.LIMIT_N_REQUESTS):
             try:
             try:
                 res: Final[T] = request_callable(url)
                 res: T = request_callable(url)
             except AccessError:
             except AccessError:
                 logger.warning(
                 logger.warning(
                     url + 'への通信失敗ナリ  '
                     url + 'への通信失敗ナリ  '
                     f'{i + 1}/{self.LIMIT_N_REQUESTS}回')
                     f'{i + 1}/{self.LIMIT_N_REQUESTS}回')
                logger.debug('エラーログ', exc_info=True)
                 if i < self.LIMIT_N_REQUESTS:
                 if i < self.LIMIT_N_REQUESTS:
                     sleep(self.WAIT_TIME_FOR_ERROR)  # 失敗時は長めに待つ
                     sleep(self.WAIT_TIME_FOR_ERROR)  # 失敗時は長めに待つ
710行目: 668行目:
     @property
     @property
     def last_url(self) -> str:
     def last_url(self) -> str:
         """最後にアクセスしたURLを返す。
         """str: 最後にアクセスしたURL。"""
 
        Returns:
            str: 最後にアクセスしたURL。
        """
         return self._last_url
         return self._last_url


     @property
     @property
     def proxies(self) -> dict[str, str] | None:
     def proxies(self) -> dict[str, str] | None:
         """RequestsAccessorオブジェクトのプロキシ設定を返す。
         """dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。"""
 
        Returns:
            dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。
        """
         return self._requests_accessor.proxies
         return self._requests_accessor.proxies


729行目: 679行目:
class TableBuilder:
class TableBuilder:
     """Wikiの表を組み立てるためのクラス。
     """Wikiの表を組み立てるためのクラス。
    Args:
        date (datetime | None, optional): 記録するツイートの最新日付。デフォルトは今日の日付。
    Attributes:
        _tables (list[str]): ツイートのリストを日毎にまとめたもの。\
            一番最後の要素が `_date` に対応し、最初の要素が最近の日付となる。
        _count (int): 表に追加したツイートの件数。
        _date (datetime): 現在収集中のツイートの日付。
     """
     """
     FILENAME: Final[str] = UserProperties.filename
     FILENAME: Final[str] = UserProperties.filename
     """Final[str]: ツイートを保存するファイルの名前。
     """Final[str]: ツイートを保存するファイルの名前。"""
    """


     def __init__(self, date: datetime | None = None) -> None:
     def __init__(self, date: datetime | None = None) -> None:
        """コンストラクタ。
         self._tables: Final[list[str]] = ['']
 
        Args:
            date (datetime | None, optional): 記録するツイートの最新日付。デフォルトは今日の日付。
        """
         self._tables: Final[list[str]] = ['']
         self._count: int = 0  # 記録数
         self._count: int = 0  # 記録数
         self._date: datetime = date or datetime.today()
         self._date: datetime = date or datetime.today()
747行目: 699行目:
     @property
     @property
     def count(self) -> int:
     def count(self) -> int:
         """表に追加したツイートの件数を返す。
         """int: 表に追加したツイートの件数。"""
 
        Returns:
            int: 表に追加したツイートの件数。
        """
         return self._count
         return self._count


769行目: 717行目:


     def dump_file(self) -> None:
     def dump_file(self) -> None:
         """Wikiテーブルをファイル出力する。
         """Wikiテーブルをファイル出力する。"""
        """
         self._next_day()
         self._next_day()
         result_txt: Final[str] = '\n'.join(reversed(self._tables))
         Path(self.FILENAME).write_text(
 
            '\n'.join(reversed(self._tables)), 'utf-8'
        with codecs.open(self.FILENAME, 'w', 'utf-8') as f:
        )
            f.write(result_txt)
         logger.info('テキストファイル手に入ったやで〜')
         logger.info('テキストファイル手に入ったやで〜')


868行目: 814行目:
         if not hasattr(cls, '_escape_callables'):
         if not hasattr(cls, '_escape_callables'):
             # 初回呼び出しの時だけ正規表現をコンパイルする
             # 初回呼び出しの時だけ正規表現をコンパイルする
             head_space_pattern: Final[Pattern[str]] = re.compile(
             head_space_pattern: Final[re.Pattern[str]] = re.compile(
                 r'^ ', re.MULTILINE)
                 r'^ ', re.MULTILINE)
             head_marks_pattern: Final[Pattern[str]] = re.compile(
             head_marks_pattern: Final[re.Pattern[str]] = re.compile(
                 r'^([\*#:;])', re.MULTILINE)
                 r'^([\*#:;])', re.MULTILINE)
             bar_pattern: Final[Pattern[str]] = re.compile(
             bar_pattern: Final[re.Pattern[str]] = re.compile(
                 r'^----', re.MULTILINE)
                 r'^----', re.MULTILINE)


890行目: 836行目:
     @staticmethod
     @staticmethod
     def archive_url(
     def archive_url(
            url: str,
        url: str,
            archived_url: str,
        archived_url: str,
            text: str | None = None) -> str:
        text: str | None = None
    ) -> str:
         """URLをArchiveテンプレートでラップする。
         """URLをArchiveテンプレートでラップする。


924行目: 871行目:


class FfmpegStatus(Enum):
class FfmpegStatus(Enum):
     """ffmpegでの動画保存ステータス。
     """ffmpegでの動画保存ステータス。"""
    """
     MP4 = 1
     MP4 = 1
     """mp4の取得に成功したときのステータス。
     """mp4の取得に成功したときのステータス。"""
    """
     TS = 2
     TS = 2
     """tsの取得までは成功したが、mp4への変換に失敗したときのステータス。
     """tsの取得までは成功したが、mp4への変換に失敗したときのステータス。"""
    """
     FAILED = 3
     FAILED = 3
     """m3u8からtsの取得に失敗したときのステータス。
     """m3u8からtsの取得に失敗したときのステータス。"""
    """




982行目: 925行目:
     """
     """


     TWITTER_URL: Final[str] = 'https://twitter.com/'
     TWITTER_URL: Final[str] = 'https://x.com/'
     """Final[str]: TwitterのURL。
     """Final[str]: TwitterのURL。


995行目: 938行目:
     INVIDIOUS_INSTANCES_URL: Final[str] = \
     INVIDIOUS_INSTANCES_URL: Final[str] = \
         'https://api.invidious.io/instances.json'
         'https://api.invidious.io/instances.json'
     """Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。
     """Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。"""
    """


     INVIDIOUS_INSTANCES_TUPLE: Final[tuple[str, ...]] = (
     INVIDIOUS_INSTANCES_TUPLE: Final[tuple[str, ...]] = (
         'piped.kavin.rocks',
         'piped.kavin.rocks',
         'piped.video'
         'piped.video',
        'invidious.poast.org'
     )
     )
     """Final[tuple[str, ...]]: よく使われるInvidiousインスタンスのリスト。
     """Final[tuple[str, ...]]: よく使われるInvidiousインスタンスのリスト。
1,009行目: 952行目:


     CALLINSHOW: Final[str] = 'CallinShow'
     CALLINSHOW: Final[str] = 'CallinShow'
     """Final[str]: 降臨ショーのユーザーネーム。
     """Final[str]: 降臨ショーのユーザーネーム。"""
    """


     LIMIT_N_TWEETS: Final[int] = UserProperties.NitterCrawler.limit_n_tweets
     LIMIT_N_TWEETS: Final[int] = UserProperties.NitterCrawler.limit_n_tweets
     """Final[int]: 取得するツイート数の上限。
     """Final[int]: 取得するツイート数の上限。"""
    """


     REPORT_INTERVAL: Final[int] = UserProperties.NitterCrawler.report_interval
     REPORT_INTERVAL: Final[int] = UserProperties.NitterCrawler.report_interval
     """Final[int]: 記録件数を報告するインターバル。
     """Final[int]: 記録件数を報告するインターバル。"""
    """


     TWEETS_OR_REPLIES: Final[str] = 'with_replies'
     TWEETS_OR_REPLIES: Final[str] = 'with_replies'
     """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。
     """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。"""
    """


     NITTER_ERROR_TITLE: Final[str] = 'Error|nitter'
     NITTER_ERROR_TITLE: Final[str] = 'Error|nitter'
1,043行目: 982行目:


     MEDIA_DIR: Final[str] = UserProperties.media_dir
     MEDIA_DIR: Final[str] = UserProperties.media_dir
     """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。
     """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。"""
    """


     def __init__(self) -> None:
     def __init__(self) -> None:
        """コンストラクタ。
        """
         self._check_constants()  # スラッシュが抜けてないかチェック
         self._check_constants()  # スラッシュが抜けてないかチェック
         self._has_ffmpeg: Final[bool] = self._check_ffmpeg()  # ffmpegがあるかチェック
         self._has_ffmpeg: Final[bool] = self._check_ffmpeg()  # ffmpegがあるかチェック


         self._img_ext_pattern: Final[Pattern[str]] = re.compile(
         self._img_ext_pattern: Final[re.Pattern[str]] = re.compile(
             r'%2F([^%]*\.(?:jpg|jpeg|png|gif))')
             r'%2F([^%]*\.(?:jpg|jpeg|png|gif))')
         self._url_fragment_pattern: Final[Pattern[str]] = re.compile(
         self._url_fragment_pattern: Final[re.Pattern[str]] = re.compile(
             r'#[^#]*$')
             r'#[^#]*$')
         self._url_query_pattern: Final[Pattern[str]] = re.compile(r'\?.*$')
         self._url_query_pattern: Final[re.Pattern[str]] = re.compile(r'\?.*$')


     def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool:
     def _set_queries(
        self, accessor: AccessorHandler, krsw: str | None
    ) -> bool:
         """検索条件を設定する。
         """検索条件を設定する。


1,066行目: 1,004行目:
         Args:
         Args:
             accessor (AccessorHandler): アクセスハンドラ
             accessor (AccessorHandler): アクセスハンドラ
             krsw (bool): Trueの場合、名前が :const:`~CALLINSHOW` になり、
             krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になり、\
                 クエリと終わりにするツイートが無しになる。
                 クエリと終わりにするツイートが無しになる。\
                更に空文字でもない場合、この引数が終わりにするツイートになる。


         Returns:
         Returns:
1,074行目: 1,013行目:


         # ユーザー名取得
         # ユーザー名取得
         if krsw:
         if krsw is not None:
             logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
             logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
             self._name: str = self.CALLINSHOW
             self._name: str = self.CALLINSHOW
1,086行目: 1,025行目:
         # 検索クエリとページ取得
         # 検索クエリとページ取得
         self._query_strs: list[str] = []
         self._query_strs: list[str] = []
         if krsw:
         if krsw is not None:
             logger.info('クエリは自動的になしにナリます')
             logger.info('クエリは自動的になしにナリます')
         else:
         else:
1,107行目: 1,046行目:


         # 終わりにするツイート取得
         # 終わりにするツイート取得
         if krsw:
         if krsw == '':
             logger.info('終わりにするツイートは自動的になしにナリます')
             logger.info('終わりにするツイートは自動的になしにナリます')
        self._stop: str = '' if krsw else self._input_stop_word()
            self._stop: str = ''
        elif krsw is not None:
            logger.info(f'終わりにするツイートは自動的に"{krsw}"にナリます')
            self._stop: str = krsw
        else:
            self._stop: str = self._input_stop_word()


         logger.info(
         logger.info(
1,170行目: 1,114行目:
         try:
         try:
             accessor.request_once(self.NITTER_INSTANCE)
             accessor.request_once(self.NITTER_INSTANCE)
         except AccessError as e:
         except AccessError:
            logger.critical(e)
             logger.critical('インスタンスが死んでますを', exc_info=True)
             logger.critical('インスタンスが死んでますを')
             sys.exit(1)
             sys.exit(1)
         logger.info('Nitter OK')
         logger.info('Nitter OK')
1,188行目: 1,131行目:
         try:
         try:
             accessor.request_once(self.ARCHIVE_TODAY)
             accessor.request_once(self.ARCHIVE_TODAY)
         except AccessError as e:  # エラー発生時は終了
         except AccessError:  # エラー発生時は終了
            logger.critical(e)
             logger.critical('インスタンスが死んでますを', exc_info=True)
             logger.critical('インスタンスが死んでますを')
             sys.exit(1)
             sys.exit(1)
         logger.info('archive.today OK')
         logger.info('archive.today OK')


     def _invidious_instances(
     def _invidious_instances(
            self, accessor: AccessorHandler) -> tuple[str, ...]:
        self, accessor: AccessorHandler
    ) -> tuple[str, ...]:
         """Invidiousのインスタンスのタプルを取得する。
         """Invidiousのインスタンスのタプルを取得する。


1,239行目: 1,182行目:
                 + 'になりますを')
                 + 'になりますを')
             print('> ', end='')
             print('> ', end='')
             account_str: Final[str] = input()
             account_str: str = input()
             # 空欄で降臨ショー
             # 空欄で降臨ショー
             if account_str == '':
             if account_str == '':
                 return self.CALLINSHOW
                 return self.CALLINSHOW
             else:
             else:
                 res: Final[str | None] = accessor.request(
                 res: str | None = accessor.request(
                     urljoin(self.NITTER_INSTANCE, account_str))
                     urljoin(self.NITTER_INSTANCE, account_str))
                 if res is None:  # リクエスト失敗判定
                 if res is None:  # リクエスト失敗判定
                     self._on_fail(accessor)
                     self._on_fail(accessor)
                     return None
                     return None
                 soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
                 soup: BeautifulSoup = BeautifulSoup(res, 'html.parser')
                 if soup.title == self.NITTER_ERROR_TITLE:
                 if soup.title == self.NITTER_ERROR_TITLE:
                     print(account_str + 'は実在の人物ではありませんでした')
                     print(account_str + 'は実在の人物ではありませんでした')
1,258行目: 1,201行目:


     def _input_query(self) -> None:
     def _input_query(self) -> None:
         """検索クエリを標準入力から取得する。
         """検索クエリを標準入力から取得する。"""
        """
         print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。')
         print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。')
         print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行')
         print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行')
1,301行目: 1,243行目:


     def _download_media(
     def _download_media(
            self,
        self,
            media_url: str,
        media_url: str,
            media_name: str,
        media_name: str,
            accessor: AccessorHandler) -> bool:
        accessor: AccessorHandler
    ) -> bool:
         """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。
         """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。


         Args:
         Args:
             media_url (str): 画像のURL。
             media_url (str): 画像のURL。
             media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、
             media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、\
                 ``/pic/media%2F`` に後続する。
                 ``/pic/media%2F`` に後続する。
             accessor (AccessorHandler): アクセスハンドラ。
             accessor (AccessorHandler): アクセスハンドラ。
1,319行目: 1,262行目:
         image_bytes: Final[bytes | None] = accessor.request_image(media_url)
         image_bytes: Final[bytes | None] = accessor.request_image(media_url)
         if image_bytes is not None:
         if image_bytes is not None:
             with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f:
             Path(self.MEDIA_DIR, media_name).write_bytes(image_bytes)
                f.write(image_bytes)
             return True
             return True
         else:
         else:
1,326行目: 1,268行目:


     def _download_m3u8(
     def _download_m3u8(
            self,
        self,
            media_path: str,
        media_url: str,
            ts_filename: str,
        ts_filename: str | None,
            mp4_filename: str,
        mp4_filename: str,
            proxies: dict[str, str] | None) -> FfmpegStatus:
        proxies: dict[str, str] | None
    ) -> FfmpegStatus:
         """ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。
         """ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。


         Args:
         Args:
             media_path (str): 動画のm3u8ファイルのNitter上でのパス。
             media_url (str): 動画のm3u8/mp4ファイルのURL。
             ts_filename (str): m3u8から取得したtsファイルのパス。
             ts_filename (str | None): m3u8から取得したtsファイルのパス。\
             mp4_filename (str): tsファイルから変換したmp4ファイルのパス。
                `None` の場合はtsファイルをダウンロードせず、直接mp4をダウンロードする。
             proxies (dict[str, str] | None): m3u8での通信に用いるプロキシ設定。
             mp4_filename (str): 取得したmp4ファイルのパス。
             proxies (dict[str, str] | None): ffmpegでの通信に用いるプロキシ設定。


         Returns:
         Returns:
             FfmpegStatus: ffmpegでの保存ステータス。
             FfmpegStatus: ffmpegでの保存ステータス。
         """
         """
         returncode: Final[int] = subprocess.run(
         os.makedirs(self.MEDIA_DIR, exist_ok=True)
            [
        if ts_filename is not None:
                'ffmpeg', '-y',
            ts_returncode: Final[int] = subprocess.run(
                '-http_proxy', 'proxies["http"]',
                [
                '-i', urljoin(self.NITTER_INSTANCE, media_path),
                    'ffmpeg', '-y',
                '-c', 'copy', ts_filename
                    '-http_proxy', 'proxies["http"]',
            ] if proxies is not None else [
                    '-i', media_url,
                'ffmpeg', '-y',
                    '-c', 'copy', ts_filename
                '-i', urljoin(self.NITTER_INSTANCE, media_path),
                ] if proxies is not None else [
                '-c', 'copy', ts_filename
                    'ffmpeg', '-y',
             ],
                    '-i', media_url,
            stdout=subprocess.DEVNULL).returncode
                    '-c', 'copy', ts_filename
                ],
                stdout=subprocess.DEVNULL).returncode
 
            # 取得成功したらtsをmp4に変換
             if ts_returncode == 0:
                ts2mp4_returncode: Final[int] = subprocess.run(
                    [
                        'ffmpeg', '-y', '-i', ts_filename,
                        '-acodec', 'copy', '-vcodec', 'copy', mp4_filename
                    ],
                    stdout=subprocess.DEVNULL
                ).returncode
                if ts2mp4_returncode == 0:
                    return FfmpegStatus.MP4
                else:
                    return FfmpegStatus.TS
            else:
                return FfmpegStatus.FAILED


         # 取得成功したらtsをmp4に変換
         else:
        if returncode == 0:
             returncode: Final[int] = subprocess.run(
             ts2mp4_returncode: Final[int] = subprocess.run(
                 [
                 [
                     'ffmpeg', '-y', '-i', ts_filename,
                     'ffmpeg', '-y',
                     '-acodec', 'copy', '-vcodec', 'copy', mp4_filename
                    '-http_proxy', 'proxies["http"]',
                    '-i', media_url,
                     '-c', 'copy', mp4_filename
                ] if proxies is not None else [
                    'ffmpeg', '-y',
                    '-i', media_url,
                    '-c', 'copy', mp4_filename
                 ],
                 ],
                 stdout=subprocess.DEVNULL
                 stdout=subprocess.DEVNULL).returncode
            ).returncode
             if returncode == 0:
             if ts2mp4_returncode == 0:
                 return FfmpegStatus.MP4
                 return FfmpegStatus.MP4
             else:
             else:
                 return FfmpegStatus.TS
                 return FfmpegStatus.FAILED
        else:
            return FfmpegStatus.FAILED


     def _tweet_date(self, url: str) -> datetime:
     def _tweet_date(self, url: str) -> datetime:
1,388行目: 1,352行目:


     def _fetch_tweet_media(
     def _fetch_tweet_media(
            self,
        self,
            tweet: Tag,
        tweet: Tag,
            tweet_url: str,
        tweet_url: str,
            accessor: AccessorHandler) -> str:
        accessor: AccessorHandler
    ) -> str:
         """ツイートの画像や動画を取得する。
         """ツイートの画像や動画を取得する。


1,411行目: 1,376行目:
             for image_a in tweet_media.select('.attachment.image a'):
             for image_a in tweet_media.select('.attachment.image a'):
                 try:
                 try:
                     href: Final[str | list[str] | None] = image_a.get('href')
                     href: str | list[str] | None = image_a.get('href')
                     assert isinstance(href, str)
                     assert isinstance(href, str)
                     img_matched: Final[Match[str] | None] = (
                     img_matched: re.Match[str] | None = (
                         self._img_ext_pattern.search(href))
                         self._img_ext_pattern.search(href))
                     assert img_matched is not None
                     assert img_matched is not None
                     media_name: Final[str] = img_matched.group(1)
                     media_name: str = img_matched.group(1)
                     media_list.append(f'[[ファイル:{media_name}|240px]]')
                     media_list.append(f'[[ファイル:{media_name}|240px]]')
                     if self._download_media(
                     if self._download_media(
                             urljoin(self.TWITTER_MEDIA_URL, media_name),
                             urljoin(self.TWITTER_MEDIA_URL, media_name),
                             media_name,
                             media_name,
                             accessor):
                             accessor):
                         logger.info(
                         logger.info(
                             os.path.join(self.MEDIA_DIR, media_name)
                             os.path.join(self.MEDIA_DIR, media_name)
                             + ' をアップロードしなければない。')
                             + ' をアップロードしなければない。')
                     else:
                     else:
                         logger.info(
                         logger.info(
                             urljoin(self.TWITTER_MEDIA_URL, media_name)
                             urljoin(self.TWITTER_MEDIA_URL, media_name)
                             + ' をアップロードしなければない。')
                             + ' をアップロードしなければない。')
                 except AttributeError:
                 except AttributeError:
                     logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
                     logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
                     media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
                     media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
 
 
             # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
             # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
             for i, video_container in enumerate(
             for i, video_container in enumerate(
                     tweet_media.select('.attachment.video-container')):
                     tweet_media.select('.attachment.video-container')):
                 if not self._has_ffmpeg:
                 if not self._has_ffmpeg:
                     logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
                     logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     continue
                     continue
 
 
                 # videoタグがない場合は取得できない
                 # videoタグがない場合は取得できない
                 video: Final[Tag | None] = video_container.select_one('video')
                 video: Tag | None = video_container.select_one('video')
                 if video is None:
                 if video is None:
                     logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                     logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     continue
                     continue
 
 
                 data_url: Final[str | list[str] | None] = video.get('data-url')
                # videoタグのdata-url属性またはvideoタグ直下のsourceタグからURLが取得できる
                 assert isinstance(data_url, str)
                 data_url: str | list[str] | None = video.get('data-url')
                 video_matched: Final[Match[str] | None] = re.search(
                source_tag: Tag | None = video.select_one('source')
                    r'[^/]+$', data_url)
                src_url: str | list[str] | None = \
                assert video_matched is not None
                    source_tag.get('src') if source_tag is not None else None
                media_path: Final[str] = unquote(video_matched.group())
                video_url: str | list[str] | None = data_url or src_url
                tweet_id: Final[str] = tweet_url.split('/')[-1]
                 assert isinstance(video_url, str)
                ts_filename: Final[str] = (
                 tweet_id: str = tweet_url.split('/')[-1]
                    f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
                if data_url is not None:
                 )
                    # data-url属性からURLを取得した場合
                 mp4_filename: Final[str] = (
                    video_matched: re.Match[str] | None = re.search(
                        r'[^/]+$', video_url)
                    assert video_matched is not None
                    media_path: str = unquote(video_matched.group())
                    media_url: str = urljoin(self.NITTER_INSTANCE, media_path)
                    ts_filename: str | None = (
                        f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
                    )
                 else:
                    # sourceタグからURLを取得した場合
                    media_url: str = video_url
                    ts_filename: str | None = None
                 mp4_filename: str = (
                     f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4'
                     f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4'
                 )
                 )


                 match self._download_m3u8(
                 match self._download_m3u8(
                         media_path,
                         media_url,
                         ts_filename,
                         ts_filename,
                         mp4_filename,
                         mp4_filename,
1,527行目: 1,504行目:
             assert poll_info is not None
             assert poll_info is not None
             for poll_meter in poll_meters:
             for poll_meter in poll_meters:
                 poll_choice_value: Final[Tag | None] = poll_meter.select_one(
                 poll_choice_value: Tag | None = poll_meter.select_one(
                     '.poll-choice-value')
                     '.poll-choice-value')
                 assert poll_choice_value is not None
                 assert poll_choice_value is not None
                 ratio: Final[str] = poll_choice_value.text
                 ratio: str = poll_choice_value.text
                 poll_choice_option: Final[Tag | None] = poll_meter.select_one(
                 poll_choice_option: Tag | None = poll_meter.select_one(
                     '.poll-choice-option')
                     '.poll-choice-option')
                 assert poll_choice_option is not None
                 assert poll_choice_option is not None
1,590行目: 1,567行目:
         urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a')
         urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a')
         for url in urls_in_tweet:
         for url in urls_in_tweet:
             href: Final[str | list[str] | None] = url.get('href')
             href: str | list[str] | None = url.get('href')
             assert isinstance(href, str)
             assert isinstance(href, str)


1,610行目: 1,587行目:
                         and self._invidious_pattern.search(href)):
                         and self._invidious_pattern.search(href)):
                     # Nitter上のYouTubeへのリンクをInvidiousのものから直す
                     # Nitter上のYouTubeへのリンクをInvidiousのものから直す
                     invidious_href: Final[str | list[str] | None] = (
                     invidious_href: str | list[str] | None = (
                         self._invidious_pattern.sub(
                         self._invidious_pattern.sub(
                             'youtube.com' if (
                             'youtube.com' if (
1,630行目: 1,607行目:
             elif url.text.startswith('@'):
             elif url.text.startswith('@'):
                 url_link: str = urljoin(self.TWITTER_URL, href)
                 url_link: str = urljoin(self.TWITTER_URL, href)
                 url_text: Final[str] = url.text
                 url_text: str = url.text
                 url.replace_with(
                 url.replace_with(
                     self._archive_url(
                     self._archive_url(
1,636行目: 1,613行目:


     def _archive_url(
     def _archive_url(
            self,
        self,
            url: str,
        url: str,
            accessor: AccessorHandler,
        accessor: AccessorHandler,
            text: str | None = None) -> str:
        text: str | None = None
    ) -> str:
         """URLをArchiveテンプレートでラップする。
         """URLをArchiveテンプレートでラップする。


1,677行目: 1,655行目:


         取得できれば魚拓ページのURLを返す。
         取得できれば魚拓ページのURLを返す。
         魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。
         魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。
         アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。
         アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。


         Args:
         Args:
1,704行目: 1,682行目:
             else:
             else:
                 assert isinstance(content, Tag)
                 assert isinstance(content, Tag)
                 content_a: Final[Tag | NavigableString | None] = content.find(
                 content_a: Final[Tag | NavigableString | None] = (
                    'a')
                    content.select_one('.TEXT-BLOCK > a'))  # 最新の魚拓を取得
                 assert isinstance(content_a, Tag)
                 assert isinstance(content_a, Tag)
                 href: Final[str | list[str] | None] = content_a.get('href')
                 href: Final[str | list[str] | None] = content_a.get('href')
1,729行目: 1,707行目:
         tweets: Final[list[Tag]] = self._get_timeline_items(soup)
         tweets: Final[list[Tag]] = self._get_timeline_items(soup)
         for tweet in tweets:
         for tweet in tweets:
             tweet_a: Final[Tag | None] = tweet.a
             tweet_a: Tag | None = tweet.a
             assert tweet_a is not None
             assert tweet_a is not None
             if tweet_a.text == self.NEWEST:
             if tweet_a.text == self.NEWEST:
1,750行目: 1,728行目:
                     continue
                     continue


             tweet_link: Final[Tag | NavigableString | None] = tweet.find(
             tweet_link: Tag | NavigableString | None = tweet.find(
                 class_='tweet-link')
                 class_='tweet-link')
             assert isinstance(tweet_link, Tag)
             assert isinstance(tweet_link, Tag)
             href: Final[str | list[str] | None] = tweet_link.get('href')
             href: str | list[str] | None = tweet_link.get('href')
             assert isinstance(href, str)
             assert isinstance(href, str)
             tweet_url: Final[str] = urljoin(
             tweet_url: str = urljoin(
                 self.TWITTER_URL,
                 self.TWITTER_URL,
                 self._url_fragment_pattern.sub('', href))
                 self._url_fragment_pattern.sub('', href))


             # 日付の更新処理
             # 日付の更新処理
             date: Final[datetime] = self._tweet_date(tweet_url)
             date: datetime = self._tweet_date(tweet_url)
             self._table_builder.next_day_if_necessary(date)
             self._table_builder.next_day_if_necessary(date)


             tweet_callinshow_template: Final[str] = self._callinshowlink_url(
             tweet_callinshow_template: str = self._callinshowlink_url(
                 tweet_url, accessor)
                 tweet_url, accessor)
             tweet_content: Final[Tag | NavigableString | None] = tweet.find(
             tweet_content: Tag | NavigableString | None = tweet.find(
                 class_='tweet-content media-body')
                 class_='tweet-content media-body')
             assert isinstance(tweet_content, Tag)
             assert isinstance(tweet_content, Tag)
             self._archive_soup(tweet_content, accessor)
             self._archive_soup(tweet_content, accessor)
             media_txt: Final[str] = self._fetch_tweet_media(
             media_txt: str = self._fetch_tweet_media(
                 tweet, tweet_url, accessor)
                 tweet, tweet_url, accessor)
             quote_txt: Final[str] = self._get_tweet_quote(tweet, accessor)
             quote_txt: str = self._get_tweet_quote(tweet, accessor)
             poll_txt: Final[str] = self._get_tweet_poll(tweet)
             poll_txt: str = self._get_tweet_poll(tweet)
             self._table_builder.append(
             self._table_builder.append(
                 tweet_callinshow_template, '<br>\n'.join(
                 tweet_callinshow_template, '<br>\n'.join(
1,814行目: 1,792行目:
         new_url: str = ''
         new_url: str = ''
         for show_more in show_mores:  # show-moreに次ページへのリンクか前ページへのリンクがある
         for show_more in show_mores:  # show-moreに次ページへのリンクか前ページへのリンクがある
             show_more_a: Final[Tag | None] = show_more.a
             show_more_a: Tag | None = show_more.a
             assert show_more_a is not None
             assert show_more_a is not None
             href: Final[str | list[str] | None] = show_more_a.get('href')
             href: str | list[str] | None = show_more_a.get('href')
             assert isinstance(href, str)
             assert isinstance(href, str)
             new_url = urljoin(
             new_url = urljoin(
1,841行目: 1,819行目:


     def _signal_handler(
     def _signal_handler(
            self, signum: int, frame: FrameType | None) -> NoReturn:
        self, signum: int, frame: FrameType | None
    ) -> NoReturn:
         """ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。
         """ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。


1,858行目: 1,837行目:
                 'See also: https://nitter.cz'
                 'See also: https://nitter.cz'
                 '\033[0m')
                 '\033[0m')
     def execute(self, krsw: bool = False, use_browser: bool = True,
     def execute(
                enable_javascript: bool = True) -> None:
        self,
        krsw: str | None = None,
        use_browser: bool = True,
        enable_javascript: bool = True
    ) -> None:
         """通信が必要な部分のロジック。
         """通信が必要な部分のロジック。


         Args:
         Args:
             krsw (bool, optional): `True` の場合、名前が自動で :const:`~CALLINSHOW` になり、
             krsw (str | None, optional): `None` でない場合、名前が自動で \
                 クエリと終わりにするツイートが自動で無しになる。
                :const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。\
             use_browser (bool, optional): `True` ならSeleniumを利用する。
                 更に空文字でもない場合、この引数が終わりにするツイートになる。
             use_browser (bool, optional): `True` ならSeleniumを利用する。\
                 `False` ならRequestsのみでアクセスする。
                 `False` ならRequestsのみでアクセスする。
             enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は
             enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \
                 `True`。
                 `True`。


1,884行目: 1,868行目:
                 self._invidious_instances(accessor)
                 self._invidious_instances(accessor)
             )
             )
             self._invidious_pattern: Pattern[str] = re.compile(
             self._invidious_pattern: re.Pattern[str] = re.compile(
                 '|'.join(invidious_url_tuple))
                 '|'.join(invidious_url_tuple))


1,899行目: 1,883行目:
                     if not self._go_to_new_page(accessor):
                     if not self._go_to_new_page(accessor):
                         break
                         break
             except BaseException as e:
             except BaseException:
                 logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。')
                 logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。')
                 self._table_builder.dump_file()
                 self._table_builder.dump_file()
                 raise e
                 raise




class UrlTuple(NamedTuple):
class UrlTuple(NamedTuple):
     """URLとその魚拓のURLのペア。
     """URLとその魚拓のURLのペア。"""
    """
     url: str
     url: str
     """URL。
     """URL。"""
    """
     archive_url: str
     archive_url: str
     """魚拓のURL。
     """魚拓のURL。"""
    """




1,929行目: 1,910行目:
         * ちゃんとテストする。
         * ちゃんとテストする。
     """
     """
    WIKI_URL: Final[str] = 'https://krsw-wiki.in'
    """Final[str]: WikiのURL。"""


     TEMPLATE_URL: Final[str] = 'https://krsw-wiki.org/wiki/テンプレート:降臨ショー恒心ログ'
     TEMPLATE_URL: Final[str] = WIKI_URL + '/wiki/テンプレート:降臨ショー恒心ログ'
     """Final[str]: テンプレート:降臨ショー恒心ログのURL。
     """Final[str]: テンプレート:降臨ショー恒心ログのURL。"""
    """


     URL_LIST_FILENAME: Final[str] = \
     URL_LIST_FILENAME: Final[str] = \
         UserProperties.ArchiveCrawler.url_list_filename
         UserProperties.ArchiveCrawler.url_list_filename
     """Final[str]: URLのリストをダンプするファイル名。
     """Final[str]: URLのリストをダンプするファイル名。"""
    """


     @override
     @override
     def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool:
     def _set_queries(
        self, accessor: AccessorHandler, krsw: str | None
    ) -> bool:
         """検索条件を設定する。
         """検索条件を設定する。


1,948行目: 1,931行目:
         Args:
         Args:
             accessor (AccessorHandler): アクセスハンドラ
             accessor (AccessorHandler): アクセスハンドラ
             krsw (bool): `True` の場合、名前が :const:`~CALLINSHOW` になる。
             krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になる。


         Returns:
         Returns:
1,955行目: 1,938行目:


         # ユーザー名取得
         # ユーザー名取得
         if krsw:
         if krsw is not None:
             logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
             logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
             self._name: str = self.CALLINSHOW
             self._name: str = self.CALLINSHOW
1,976行目: 1,959行目:
             + ', 最古のURL: ' + self._oldest_url + '*' + 'で検索しまふ'
             + ', 最古のURL: ' + self._oldest_url + '*' + 'で検索しまふ'
         )
         )
         self._twitter_url_pattern: Pattern[str] = re.compile(
         self._twitter_url_pattern: re.Pattern[str] = re.compile(
             '^' + self.TWITTER_URL + self._name + r'/status/\d+')
             '^' + self.TWITTER_URL + self._name + r'/status/\d+')
         self._archive_rt_pattern: Pattern[str] = re.compile(
         self._archive_rt_pattern: re.Pattern[str] = re.compile(
             r'on (?:Twitter|X): "RT @\w+:.+"(?:$| / Twitter$| / X$)')
             r'on (?:Twitter|X): "RT @\w+:.+"(?:$| / Twitter$| / X$)')


2,033行目: 2,016行目:


         urls: Final[list[str]] = list(map(
         urls: Final[list[str]] = list(map(
             lambda x: 'https://krsw-wiki.org' + assert_get(x, 'href'),
             lambda x: self.WIKI_URL + assert_get(x, 'href'),
             template_soup.select('.wikitable > tbody > tr > td a')))
             template_soup.select('.wikitable > tbody > tr > td a')))
         for url in urls:
         for url in urls:
             logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ')
             logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ')
             page: Final[str | None] = (
             page: str | None = accessor.request_with_requests_module(url)
                accessor.request_with_requests_module(url))
             assert page is not None
             assert page is not None
             soup: Final[BeautifulSoup] = BeautifulSoup(page, 'html.parser')
             soup: BeautifulSoup = BeautifulSoup(page, 'html.parser')
             url_as = soup.select('tr > th a')
             url_as = soup.select('tr > th a')
             assert len(url_as) > 0
             assert len(url_as) > 0
2,061行目: 2,043行目:
         for tweet in tweets:
         for tweet in tweets:
             # リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる
             # リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる
             urls: Final[list[str]] = list(map(
             urls: list[str] = list(map(
                 lambda a: a.text, tweet.select('a')[1:]))
                 lambda a: a.text, tweet.select('a')[1:]))
             # 別のユーザへのリダイレクトがあるものは除く
             # 別のユーザへのリダイレクトがあるものは除く
2,068行目: 2,050行目:
                 continue
                 continue


             url_matched: Final[Match[str]] | None = next(
             url_matched: Final[re.Match[str]] | None = next(
                 filter(
                 filter(
                     lambda x: x is not None,
                     lambda x: x is not None,
2,078行目: 2,060行目:
                     logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア')
                     logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア')
                     continue
                     continue
                 a_first_child: Final[Tag | None] = tweet.select_one(
                 a_first_child: Tag | None = tweet.select_one('a:first-child')
                    'a:first-child')
                 assert a_first_child is not None
                 assert a_first_child is not None
                 archive_url: Final[str | list[str] | None] = a_first_child.get(
                 archive_url: str | list[str] | None = a_first_child.get('href')
                    'href')
                 assert isinstance(archive_url, str)
                 assert isinstance(archive_url, str)
                 if url_matched[0] not in self._url_list_on_wiki:
                 if url_matched[0] not in self._url_list_on_wiki:
2,098行目: 2,078行目:


     def _fetch_next_page(
     def _fetch_next_page(
            self, soup: Tag, accessor: AccessorHandler) -> str | None:
        self, soup: Tag, accessor: AccessorHandler
    ) -> str | None:
         """archive.todayの検索結果のページをpaginateする。
         """archive.todayの検索結果のページをpaginateする。


2,128行目: 2,109行目:
         while has_next:
         while has_next:
             self._append_tweet_urls(soup)
             self._append_tweet_urls(soup)
             next_page: Final[str | None] = self._fetch_next_page(
             next_page: str | None = self._fetch_next_page(soup, accessor)
                soup, accessor)
             if next_page is not None:
             if next_page is not None:
                 soup = BeautifulSoup(next_page, 'html.parser')
                 soup = BeautifulSoup(next_page, 'html.parser')
2,136行目: 2,116行目:


     def _next_url(
     def _next_url(
            self,
        self,
            accessor: AccessorHandler,
        accessor: AccessorHandler,
            tweet_url_prefix: str,
        tweet_url_prefix: str,
            incremented_num: int,
        incremented_num: int,
            incremented: bool = False) -> None:
        incremented: bool = False
    ) -> None:
         """ツイートのURLを、数字部分をインクリメントしながら探索する。
         """ツイートのURLを、数字部分をインクリメントしながら探索する。


2,180行目: 2,161行目:
         pager: Final[Tag | None] = soup.select_one('#pager')
         pager: Final[Tag | None] = soup.select_one('#pager')
         if pager is not None:  # 検索結果が複数ページ
         if pager is not None:  # 検索結果が複数ページ
             page_num_matched: Final[Match[str] | None] = re.search(
             page_num_matched: Final[re.Match[str] | None] = re.search(
                 r'of (\d+) urls', pager.text)
                 r'of (\d+) urls', pager.text)
             assert page_num_matched is not None
             assert page_num_matched is not None
2,218行目: 2,199行目:
                           True)
                           True)


     def _parse_images(self, soup: Tag,
     def _parse_images(
                      accessor: AccessorHandler) -> tuple[str, ...]:
        self, soup: Tag, accessor: AccessorHandler
    ) -> tuple[str, ...]:
         """ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。
         """ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。


2,271行目: 2,253行目:
         if len(internal_a_tags) > 0:
         if len(internal_a_tags) > 0:
             for a_tag in internal_a_tags:
             for a_tag in internal_a_tags:
                 account_name: Final[str] = a_tag.text
                 account_name: str = a_tag.text
                 if account_name.startswith('#'):
                 if account_name.startswith('#'):
                     a_tag.replace_with(a_tag.text)
                     a_tag.replace_with(a_tag.text)
                 else:
                 else:
                     url: Final[str] = urljoin(
                     url: str = urljoin(self.TWITTER_URL, account_name[1:])
                        self.TWITTER_URL, account_name[1:])
                     a_tag.replace_with(TableBuilder.archive_url(
                     a_tag.replace_with(TableBuilder.archive_url(
                         url,
                         url,
2,284行目: 2,265行目:
         # 通常のリンク
         # 通常のリンク
         a_tags: Final[ResultSet[Tag]] = tag.select(
         a_tags: Final[ResultSet[Tag]] = tag.select(
             'div[data-testid="tweetText"]:not('
             'div[dir="auto"] > a:not('
             'div[role="link"] div[data-testid="tweetText"]) > a')
             'div[role="link"] div[dir="auto"] > a)')
         for a_tag in a_tags:
         for a_tag in a_tags:
             a_tag.replace_with(
             a_tag.replace_with(
2,372行目: 2,353行目:


     def _get_tweet_from_archive(
     def _get_tweet_from_archive(
            self,
        self,
            url_pairs: list[UrlTuple],
        url_pairs: list[UrlTuple],
            accessor: AccessorHandler) -> None:
        accessor: AccessorHandler
    ) -> None:
         """魚拓からツイート本文を取得する。
         """魚拓からツイート本文を取得する。


2,384行目: 2,366行目:


         for url_pair in url_pairs:
         for url_pair in url_pairs:
             page: Final[str | None] = accessor.request(url_pair.archive_url)
             page: str | None = accessor.request(url_pair.archive_url)
             assert page is not None
             assert page is not None
             article: Final[Tag | None] = BeautifulSoup(
             article: Tag | None = BeautifulSoup(
                 page, 'html.parser'
                 page, 'html.parser'
             ).select_one('article[tabindex="-1"]')
             ).select_one('article[tabindex="-1"]')
2,394行目: 2,376行目:


                 logger.debug(url_pair.url + 'を整形しますを')
                 logger.debug(url_pair.url + 'を整形しますを')
                 tweet_date: Final[datetime] = self._tweet_date(url_pair.url)
                 tweet_date: datetime = self._tweet_date(url_pair.url)
                 table_builder.next_day_if_necessary(tweet_date)
                 table_builder.next_day_if_necessary(tweet_date)


                 tweet_callinshow_template: str = (
                 tweet_callinshow_template: str = (
                     self._callinshowlink_url(url_pair.url, accessor))
                     TableBuilder.callinshowlink_url(
                        url_pair.url, url_pair.archive_url.replace(
                            self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)))


                 # 本文
                 # 本文
                 try:
                 try:
                     text_tag: Tag | None = article.select_one(
                     text_tag: Tag | None = article.select_one(
                         'div[data-testid="tweetText"]:not('
                         'div[dir="auto"]')
                        'div[role="link"] div[data-testid="tweetText"])')
                     if text_tag is not None:
                     if text_tag is not None:
                         text_tag = self._retrieve_emojis(text_tag)
                         text_tag = self._retrieve_emojis(text_tag)
2,413行目: 2,396行目:


                     # YouTube等のリンク
                     # YouTube等のリンク
                     card_tag: Final[Tag | None] = article.select_one(
                     card_tag: Tag | None = article.select_one(
                         'div[data-testid="card.layoutSmall.media"]:not('
                         'div[aria-label="Play"]:not(div[role="link"] '
                        'div[role="link"] '
                         'div[aria-label="Play"])')
                         'div[data-testid="card.layoutSmall.media"])')
                     if card_tag is not None:
                     if card_tag is not None:
                         text = self._concat_texts(
                         text = self._concat_texts(
2,423行目: 2,405行目:


                     # 画像に埋め込まれた外部サイトへのリンク
                     # 画像に埋め込まれた外部サイトへのリンク
                     article_tag: Final[Tag | None] = article.select_one(
                     article_tag: Tag | None = article.select_one(
                         'article div[data-testid="card.layoutLarge.media"] a')
                         'a[role="link"][aria-label] img:not(div[role="link"] '
                        'a[role="link"][aria-label] img)')
                     if article_tag is not None:
                     if article_tag is not None:
                         text = self._concat_texts(
                         text = self._concat_texts(
2,431行目: 2,414行目:


                     # 引用の有無のチェック
                     # 引用の有無のチェック
                     retweet_tag: Final[Tag | None] = article.select_one(
                     retweet_tag: Tag | None = article.select_one(
                         'article[data-testid="tweet"] div[role="link"]')
                         'div[role="link"]')
                     if retweet_tag is not None:
                     if retweet_tag is not None:
                         account_name_tag: Final[Tag | None] = (
                         account_name_tag: Tag | None = (
                             retweet_tag.select_one(
                             retweet_tag.select_one(
                                 'div[data-testid="User-Name"] div[tabindex="-1"]'))  # noqa: E501
                                 'div[tabindex="-1"] > div > span:not(:has(> *))'))  # noqa: E501
                         assert account_name_tag is not None
                         assert account_name_tag is not None
                         text = self._concat_texts(
                         text = self._concat_texts(
2,452行目: 2,435行目:


                     # 投票の処理
                     # 投票の処理
                     poll_txt: Final[str] = self._get_tweet_poll(article)
                     poll_txt: str = self._get_tweet_poll(article)
                     text = self._concat_texts(text, poll_txt)
                     text = self._concat_texts(text, poll_txt)


                     # バージョンの処理
                     # バージョンの処理
                     if article.select_one(
                     possible_version_text_tags: ResultSet[Tag] = (
                        'svg[aria-label="There’s a new version of this post."]'
                        article.select(
                    ) is not None:
                            'div > span > '
                            'span:not(:has(> *)):not(div[role="link"] div > '
                            'span > span:not(:has(> *)))'))
                    if len(list(filter(
                        lambda x: x.text
                            == 'There’s a new version of this post.',
                            possible_version_text_tags))) > 0:
                         tweet_callinshow_template += '([[新しいバージョンがあります|後の版→]])'
                         tweet_callinshow_template += '([[新しいバージョンがあります|後の版→]])'


                 except Exception as e:
                 except Exception as e:
                     # エラーが起きても止めない
                     # エラーが起きても止めない
                     logger.exception(e)
                     logger.exception('エラーが発生してツイートが取得できませんでしたを', exc_info=True)
                     text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join(
                     text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join(
                         TracebackException.from_exception(e).format())
                         TracebackException.from_exception(e).format())
                 table_builder.append(tweet_callinshow_template, text)
                 table_builder.append(tweet_callinshow_template, text)
             else:
             else:
                 logger.warn(url_pair.url + 'はリツイートなので飛ばすナリ。'
                 logger.warning(url_pair.url + 'はリツイートなので飛ばすナリ。'
                            'URLリスト収集の時点でフィルタできなかった、これはいけない。')
                              'URLリスト収集の時点でフィルタできなかった、これはいけない。')


         table_builder.dump_file()
         table_builder.dump_file()


     @override
     @override
     def execute(self, krsw: bool = False, use_browser: bool = True,
     def execute(
                enable_javascript: bool = True) -> None:
        self,
        krsw: str | None = None,
        use_browser: bool = True,
        enable_javascript: bool = True
    ) -> None:
         """通信が必要な部分のロジック。
         """通信が必要な部分のロジック。


         Args:
         Args:
             krsw (bool, optional): `True` の場合、名前が自動で :const:`~CALLINSHOW` になり、
             krsw (str | None, optional): `None` でない場合、名前が自動で \
                クエリと終わりにするツイートが自動で無しになる。
                :const:`~CALLINSHOW` になる。
             use_browser (bool, optional): `True` ならSeleniumを利用する。
             use_browser (bool, optional): `True` ならSeleniumを利用する。\
                 `False` ならRequestsのみでアクセスする。
                 `False` ならRequestsのみでアクセスする。
             enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は
             enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \
                 `True`。
                 `True`。
         """
         """
2,502行目: 2,495行目:
                 sys.exit(1)
                 sys.exit(1)
             # Wikiに既に掲載されているツイートのURLを取得
             # Wikiに既に掲載されているツイートのURLを取得
             self._get_tweet_urls_from_wiki(accessor)
             self._get_tweet_urls_from_wiki(accessor) # Wikiに接続できない時はここをコメントアウト


             # 未掲載のツイートのURLを取得する
             # 未掲載のツイートのURLを取得する
2,509行目: 2,502行目:


             # URL一覧ファイルのダンプ
             # URL一覧ファイルのダンプ
             with codecs.open(self.URL_LIST_FILENAME, 'w', 'utf-8') as f:
             with Path(self.URL_LIST_FILENAME).open('w', encoding='utf-8') as f:
                 for url_pair in self._url_list:
                 for url_pair in self._url_list:
                     f.write(url_pair.url + '\n')
                     f.write(url_pair.url + '\n')
2,532行目: 2,525行目:
     parser.add_argument(
     parser.add_argument(
         '--krsw',
         '--krsw',
         action='store_true',
         type=str,
         help='指定すると、パカデブのツイートを取得上限数まで取得する。')
        nargs='?',
        const='',
        default=None,
         help=('指定すると、パカデブのツイートを取得上限数まで取得する。'
              '更に--search-unarchivedモードでこのオプションに引数を与えると、'
              'その文言が終わりにするツイートになる。'))
     parser.add_argument(
     parser.add_argument(
         '-n',
         '-n',
91

回編集