「利用者:夜泣き/スクリプト」の版間の差分

→‎コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す
>Fet-Fe
(→‎コード: v4.0.2 JavaScriptをオフにするオプションと、Tor Browserを利用しないオプションを追加)
>Fet-Fe
(→‎コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す)
7行目: 7行目:
"""Twitter自動収集スクリプト
"""Twitter自動収集スクリプト


ver4.0.2 2023/9/25恒心
ver4.0.3 2023/9/28恒心


当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
23行目: 23行目:
         $ python3 (ファイル名) --krsw
         $ python3 (ファイル名) --krsw


     ``--no_use_browser`` オプションでTor Browserを使用しないモードに、``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
     ``--no_browser`` オプションでTor Browserを使用しないモードに、
    ``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。


     自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
     自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
29行目: 30行目:


Note:
Note:
     * Pythonのバージョンは3.10以上
     * Pythonのバージョンは3.11以上
     * 環境は玉葱前提です。
     * 環境は玉葱前提です。
     * Whonix-Workstation, MacOSで動作確認済
     * Whonix-Workstation, MacOSで動作確認済
45行目: 46行目:
"""
"""


#インポート類
import sys
import os
import codecs
import codecs
import re
import json
import json
import logging
import os
import platform
import random
import random
import re
import subprocess
import sys
import warnings
from argparse import ArgumentParser, Namespace
from collections.abc import Callable
from datetime import datetime
from datetime import datetime
from zoneinfo import ZoneInfo
from logging import Logger, getLogger
from time import sleep
from time import sleep
from types import MappingProxyType
from types import MappingProxyType, TracebackType
from typing import Final, NoReturn, Any, Self
from typing import Any, Final, NoReturn, Self
from collections.abc import Callable
from urllib.parse import quote, unquote, urljoin
from urllib.parse import quote, unquote, urljoin
import warnings
from zoneinfo import ZoneInfo
import subprocess
import platform
from logging import getLogger, Logger
from argparse import ArgumentParser, Namespace


import requests
import requests
import bs4
from bs4 import BeautifulSoup
from bs4 import BeautifulSoup
from bs4.element import NavigableString, ResultSet, Tag
from selenium import webdriver
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, WebDriverException
from selenium.common.exceptions import (NoSuchElementException,
                                        WebDriverException)
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By


logging.basicConfig(level=logging.INFO,
                    format='{asctime} [{levelname:.4}] : {message}', style='{')
logger: Logger = getLogger(__name__)
logger: Logger = getLogger(__name__)


##おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化
warnings.simplefilter('ignore')
warnings.simplefilter('ignore')


87行目: 91行目:




class ReCaptchaFoundError(Exception):
class ReCaptchaRequiredError(Exception):
     """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。
     """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。
     """
     """
118行目: 122行目:
         自動操縦だとWebサイトに見破られないため。
         自動操縦だとWebサイトに見破られないため。
         """
         """
         sleep(random.randrange(self.WAIT_TIME, self.WAIT_TIME + self.WAIT_RANGE))
         sleep(random.randrange(self.WAIT_TIME,
                              self.WAIT_TIME + self.WAIT_RANGE))




153行目: 158行目:
         """
         """
         self._proxies: dict[str, str] | None = None
         self._proxies: dict[str, str] | None = None
         self._proxies = self._get_tor_proxies() ##Torに必要なプロキシをセット
         self._proxies = self._choose_tor_proxies() # Torに必要なプロキシをセット


     def _execute(self, url: Final[str], proxies: dict[str, str] | None) -> requests.models.Response:
     def _execute(self,
                url: str,
                proxies: dict[str,
                              str] | None) -> requests.models.Response:
         """引数のURLにrequestsモジュールでHTTP接続する。
         """引数のURLにrequestsモジュールでHTTP接続する。


         Args:
         Args:
             url Final[str]: 接続するURL。
             url str: 接続するURL。
             proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_get_tor_proxies` で設定した値を利用する。
             proxies dir[str, str]: 接続に利用するプロキシ。
            デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。


         Returns:
         Returns:
169行目: 178行目:
             AccessError: ステータスコードが200でない場合のエラー。
             AccessError: ステータスコードが200でない場合のエラー。
         """
         """
         sleep(self.WAIT_TIME) ##DoS対策で待つ
         sleep(self.WAIT_TIME) # DoS対策で待つ
         try:
         try:
             res: requests.models.Response = requests.get(url, timeout=self.REQUEST_TIMEOUT, headers=self.HEADERS, allow_redirects=False, proxies=proxies if proxies is not None else self._proxies)
             res: requests.models.Response = requests.get(
             res.raise_for_status() ##HTTPステータスコードが200番台以外でエラー発生
                url,
                timeout=self.REQUEST_TIMEOUT,
                headers=self.HEADERS,
                allow_redirects=False,
                proxies=proxies if proxies is not None else self._proxies)
             res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生
         except requests.exceptions.ConnectionError:
         except requests.exceptions.ConnectionError:
             raise
             raise
         except requests.exceptions.RequestException as e:
         except requests.exceptions.RequestException as e:
             # requestsモジュール固有の例外を共通の例外に変換
             # requestsモジュール固有の例外を共通の例外に変換
             raise AccessError(str(e)) from None
             raise AccessError(str(e)) from e
         return res
         return res


     def get(self, url: Final[str], proxies: dict[str, str]=None) -> str:
     def get(self,
            url: str,
            proxies: dict[str,
                          str] | None = None) -> str:
         """引数のURLにrequestsモジュールでHTTP接続する。
         """引数のURLにrequestsモジュールでHTTP接続する。


         Args:
         Args:
             url Final[str]: 接続するURL。
             url str: 接続するURL。
             proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_get_tor_proxies` で設定した値を利用する。
             proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。


         Returns:
         Returns:
199行目: 216行目:
             raise
             raise


     def get_image(self, url: Final[str]) -> bytes | None:
     def get_image(self, url: str) -> bytes | None:
         """引数のURLから画像のバイナリ列を取得する。
         """引数のURLから画像のバイナリ列を取得する。


         Args:
         Args:
             url Final[str]: 接続するURL
             url str: 接続するURL


         Returns:
         Returns:
222行目: 239行目:
             return None
             return None


     def _get_tor_proxies(self) -> dict[str, str] | None | NoReturn:
     def _choose_tor_proxies(self) -> dict[str, str] | None | NoReturn:
         """Torを使うのに必要なプロキシ情報を返す。
         """Torを使うのに必要なプロキシ情報を返す。


         プロキシなしで接続できればNone、Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。
         プロキシなしで接続できればNone、
        Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`
        torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。
         いずれでもアクセスできなければ異常終了する。
         いずれでもアクセスできなければ異常終了する。


234行目: 253行目:
             RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。
             RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。
         """
         """
         print('Torのチェック中ですを')
         logger.info('Torのチェック中ですを')
         # プロキシなしでTorにアクセスできるかどうか
         # プロキシなしでTorにアクセスできるかどうか
         res: str = self.get(self.TOR_CHECK_URL, proxies=None)
         res: str = self.get(self.TOR_CHECK_URL, proxies=None)
         is_tor: bool = json.loads(res)['IsTor']
         is_tor: bool = json.loads(res)['IsTor']
         if is_tor:
         if is_tor:
             print('Tor connection OK')
             logger.info('Tor connection OK')
             return None
             return None


         # Tor BrowserのプロキシでTorにアクセスできるかどうか
         # Tor BrowserのプロキシでTorにアクセスできるかどうか
         try:
         try:
             res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_BROWSER)
             res = self.get(self.TOR_CHECK_URL,
                          proxies=self.PROXIES_WITH_BROWSER)
             is_tor = json.loads(res)['IsTor']
             is_tor = json.loads(res)['IsTor']
             if is_tor:
             if is_tor:
                 print('Tor connection OK')
                 logger.info('Tor connection OK')
                 return self.PROXIES_WITH_BROWSER
                 return self.PROXIES_WITH_BROWSER
         except requests.exceptions.ConnectionError:
         except requests.exceptions.ConnectionError:
254行目: 274行目:
         # torコマンドのプロキシでTorにアクセスできるかどうか
         # torコマンドのプロキシでTorにアクセスできるかどうか
         try:
         try:
             res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_COMMAND)
             res = self.get(self.TOR_CHECK_URL,
                          proxies=self.PROXIES_WITH_COMMAND)
             is_tor = json.loads(res)['IsTor']
             is_tor = json.loads(res)['IsTor']
             if is_tor:
             if is_tor:
                 print('Tor proxy OK')
                 logger.info('Tor proxy OK')
                 return self.PROXIES_WITH_COMMAND
                 return self.PROXIES_WITH_COMMAND
             else:
             else:
                 raise RuntimeError('サイトにTorのIPでアクセスできていないなりを')
                 raise RuntimeError('サイトにTorのIPでアクセスできていないなりを')
         except requests.exceptions.ConnectionError as e:
         except requests.exceptions.ConnectionError as e:
             print(e, file=sys.stderr)
             logger.critical(e)
             print('通信がTorのSOCKS proxyを経由していないなりを', file=sys.stderr)
             logger.critical('通信がTorのSOCKS proxyを経由していないなりを')
             exit(1)
             exit(1)


281行目: 302行目:


     TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({
     TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({
         "Windows": "",
         'Windows': '',
         "Darwin": "/Applications/Tor Browser.app/Contents/MacOS/firefox",
         'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox',
         "Linux": ""
         'Linux': '/usr/bin/torbrowser'
     })
     })
     """MappingProxyType[str, str]: OSごとのTor Browserのパス。
     """MappingProxyType[str, str]: OSごとのTor Browserのパス。
309行目: 330行目:


         if enable_javascript:
         if enable_javascript:
             print("reCAPTCHA対策のためJavaScriptをonにしますを")
             logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')


         options.preferences.update({
         options.preferences.update({
             "javascript.enabled": enable_javascript,
             'javascript.enabled': enable_javascript,
             "intl.accept_languages": "en-US, en",
             'intl.accept_languages': 'en-US, en',
             "intl.locale.requested": "US",
             'intl.locale.requested': 'US',
             "font.language.group": "x-western",
             'font.language.group': 'x-western',
             "dom.webdriver.enabled": False # 自動操縦と見破られないための設定
             'dom.webdriver.enabled': False # 自動操縦と見破られないための設定
         })
         })


322行目: 343行目:
             self.driver: webdriver.Firefox = webdriver.Firefox(options=options)
             self.driver: webdriver.Firefox = webdriver.Firefox(options=options)
             sleep(1)
             sleep(1)
             wait_init: WebDriverWait = WebDriverWait(self.driver, self.WAIT_TIME_FOR_INIT)
             wait_init: WebDriverWait = WebDriverWait(self.driver,
             wait_init.until(EC.element_to_be_clickable((By.ID, "connectButton")))
                                                    self.WAIT_TIME_FOR_INIT)
             self.driver.find_element(By.ID, "connectButton").click()
             wait_init.until(
             wait_init.until(EC.url_contains("about:blank")) # Torの接続が完了するまで待つ
                ec.element_to_be_clickable((By.ID, 'connectButton'))
            )
             self.driver.find_element(By.ID, 'connectButton').click()
             wait_init.until(ec.url_contains('about:blank')) # Torの接続が完了するまで待つ


             self.wait: WebDriverWait = WebDriverWait(self.driver, self.REQUEST_TIMEOUT)
             self.wait: WebDriverWait = WebDriverWait(self.driver,
         except Exception:
                                                    self.REQUEST_TIMEOUT)
         except BaseException:
             self.quit()
             self.quit()
             raise
             raise
342行目: 367行目:


         Raises:
         Raises:
             ReCaptchaFoundError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。
             ReCaptchaRequiredError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。
 
        Todo:
            botバレしたときに自動で他のTorサーキットに接続し直す。
         """
         """
         try:
         try:
             self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]') # 要素がない時に例外を吐く
             self.driver.find_element( # 要素がない時に例外を吐く
                By.CSS_SELECTOR,
                'script[src^="https://www.google.com/recaptcha/api.js"]')
             if self._javascript_enabled:
             if self._javascript_enabled:
                 print("reCAPTCHAを解いてね(笑)、それはできるよね。")
                 logger.warning('reCAPTCHAを解いてね(笑)、それはできるよね。')
                 print("botバレしたらNew Tor circuit for this siteを選択するナリよ")
                 logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ')
                 WebDriverWait(self.driver, 10000).until(EC.staleness_of(self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]')))
                 WebDriverWait(
                 sleep(self.WAIT_TIME) ##DoS対策で待つ
                    self.driver,
                    10000).until(
                    ec.staleness_of(
                        self.driver.find_element(
                            By.CSS_SELECTOR,
                            'script[src^="https://www.google.com/recaptcha/api.js"]')))
                 sleep(self.WAIT_TIME) # DoS対策で待つ
             else:
             else:
                 raise ReCaptchaFoundError("JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: " + self.driver.current_url)
                 raise ReCaptchaRequiredError(
                    'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: '
                    + self.driver.current_url)
         except NoSuchElementException:
         except NoSuchElementException:
             # reCAPTCHAの要素がなければそのまま
             # reCAPTCHAの要素がなければそのまま
             pass
             pass


     def get(self, url: Final[str]) -> str:
     def get(self, url: str) -> str:
         """引数のURLにSeleniumでHTTP接続する。
         """引数のURLにSeleniumでHTTP接続する。


         Args:
         Args:
             url Final[str]: 接続するURL。
             url str: 接続するURL。


         Returns:
         Returns:
             str: レスポンスのHTML。
             str: レスポンスのHTML。
         """
         """
         self._random_sleep() ##DoS対策で待つ
         self._random_sleep() # DoS対策で待つ
         try:
         try:
             self.driver.get(url)
             self.driver.get(url)
372行目: 410行目:
         except WebDriverException as e:
         except WebDriverException as e:
             # Selenium固有の例外を共通の例外に変換
             # Selenium固有の例外を共通の例外に変換
             raise AccessError(str(e)) from None
             raise AccessError(str(e)) from e
         return self.driver.page_source
         return self.driver.page_source


399行目: 437行目:
             enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。
             enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。
         """
         """
         self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(enable_javascript) if use_browser else None
         self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(
            enable_javascript) if use_browser else None
         self.requests_accessor: RequestsAccessor = RequestsAccessor()
         self.requests_accessor: RequestsAccessor = RequestsAccessor()


     def __enter__(self) -> Self:
     def __enter__(self) -> Self:
         """withブロックの開始時に実行する。
         """withブロックの開始時に実行する。
        Returns:
            Self: オブジェクト自身。
         """
         """
         return self
         return self


     def __exit__(self, *args) -> None:
     def __exit__(self,
                exc_type: type[BaseException] | None,
                exc_value: BaseException | None,
                traceback: TracebackType | None) -> None:
         """withブロックの終了時に実行する。
         """withブロックの終了時に実行する。
        Args:
            exc_type (type[BaseException] | None): コンテキスト内で例外を吐いた場合の例外タイプ。
            exc_value (BaseException | None): コンテキスト内で例外を吐いた場合の例外。
            traceback (TracebackType | None): コンテキスト内で例外を吐いた場合のトレースバック。
         """
         """
         if self.selenium_accessor is not None:
         if self.selenium_accessor is not None:
             self.selenium_accessor.quit()
             self.selenium_accessor.quit()


     def request_once(self, url: Final[str]) -> str:
     def request_once(self, url: str) -> str:
         """引数のURLにHTTP接続する。
         """引数のURLにHTTP接続する。


         Args:
         Args:
             url Final[str]: 接続するURL。
             url str: 接続するURL。


         Returns:
         Returns:
436行目: 486行目:
             raise
             raise


     def _request_with_callable(self, url: Final[str], request_callable: Callable[[str], Any]) -> Any | None:
     def _request_with_callable(self,
                              url: str,
                              request_callable: Callable[[str],
                                                          Any]) -> Any | None:
         """request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
         """request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。


443行目: 496行目:


         Args:
         Args:
             url Final[str]: 接続するURL
             url str: 接続するURL
             request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。
             request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。


         Returns:
         Returns:
             Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。
             Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。
        Note:
            失敗かどうかは呼出側で要判定
         """
         """
         for i in range(1, self.LIMIT_N_REQUESTS + 1):
         for i in range(1, self.LIMIT_N_REQUESTS + 1):
456行目: 506行目:
                 res: Any = request_callable(url)
                 res: Any = request_callable(url)
             except AccessError:
             except AccessError:
                 print(url + 'への通信失敗ナリ  ' + f"{i}/{self.LIMIT_N_REQUESTS}回")
                 logger.warning(
                    url
                    + 'への通信失敗ナリ  '
                    + f'{i}/{self.LIMIT_N_REQUESTS}回')
                 if i < self.LIMIT_N_REQUESTS:
                 if i < self.LIMIT_N_REQUESTS:
                     sleep(self.WAIT_TIME_FOR_ERROR) ##失敗時は長めに待つ
                     sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ
             else:
             else:
                 return res
                 return res
         return None
         return None


     def request(self, url: Final[str]) -> str | None:
     def request(self, url: str) -> str | None:
         """HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
         """HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。


470行目: 523行目:


         Args:
         Args:
             url Final[str]: 接続するURL
             url str: 接続するURL


         Returns:
         Returns:
480行目: 533行目:
         return self._request_with_callable(url, self.request_once)
         return self._request_with_callable(url, self.request_once)


     def request_with_requests_module(self, url: Final[str]) -> str | None:
     def request_with_requests_module(self, url: str) -> str | None:
         """requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
         """requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。


487行目: 540行目:


         Args:
         Args:
             url Final[str]: 接続するURL
             url str: 接続するURL


         Returns:
         Returns:
497行目: 550行目:
         return self._request_with_callable(url, self.requests_accessor.get)
         return self._request_with_callable(url, self.requests_accessor.get)


     def request_image(self, url: Final[str]) -> bytes | None:
     def request_image(self, url: str) -> bytes | None:
         """requestsモジュールで画像ファイルを取得します。
         """requestsモジュールで画像ファイルを取得します。


509行目: 562行目:
             bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。
             bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。
         """
         """
         return self._request_with_callable(url, self.requests_accessor.get_image)
         return self._request_with_callable(url,
                                          self.requests_accessor.get_image)


     @property
     @property
     def proxies(self) -> dict[str, str] | None:
     def proxies(self) -> dict[str, str] | None:
        """RequestsAccessorオブジェクトのプロキシ設定を返す。
        Returns:
            dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。
        """
         return self.requests_accessor.proxies
         return self.requests_accessor.proxies
class TableBuilder:
    def __init__(self, date: datetime):
        self._tables: list[str] = ['']
        self._count: int = 0  # 記録数
        self._date: datetime = date
    @property
    def count(self) -> int:
        return self._count
    def append(self, archived_tweet_url: str, text: str) -> None:
        self._tables[0] = '!' + archived_tweet_url + '\n|-\n|\n' \
            + text \
            + '\n|-\n' \
            + self._tables[0]  # wikiの文法に変化
        self._count += 1
    def dump_file(self) -> NoReturn:
        """Wikiテーブルをファイル出力し、プログラムを終了する。
        """
        self._next_day()
        result_txt: Final[str] = '\n'.join(self._tables)
        with codecs.open('tweet.txt', 'w', 'utf-8') as f:
            f.write(result_txt)
        logger.info('テキストファイル手に入ったやで〜')
        exit(0)
    def next_day_if_necessary(self, date: datetime | None = None) -> None:
        if date is None:
            return
        if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day:
            self._next_day(date)
    def _next_day(self, date: datetime | None = None) -> None:
        if self._tables[0]:
            self._tables[0] = self._convert_to_text_table(self._tables[0])
            if os.name == 'nt':  # Windows
                self._txt_data[0] = self._date.strftime(
                    '\n=== %#m月%#d日 ===\n') + self._txt_data[0]
                logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
            else:  # Mac or Linux
                self._tables[0] = self._date.strftime(
                    '\n=== %-m月%-d日 ===\n') + self._tables[0]
                logger.info(self._date.strftime('%-m月%-d日のツイートを取得完了ですを'))
        if date is not None:
            self._tables.insert(0, '')
            self._date = date
    def _convert_to_text_table(self, text: str) -> str:
        """``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。
        Args:
            text str: ヘッダとフッタがないWikiテーブル。
        Returns:
            str: テーブル表示用のヘッダとフッタがついたWikiテーブル。
        """
        return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'
    @staticmethod
    def escape_wiki_reserved_words(text: str) -> str:
        """MediaWikiの文法と衝突する文字を無効化する。
        Args:
            text str: ツイートの文字列。
        Returns:
            str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。
        """
        def escape_nolink_urls(text: str) -> str:
            """Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。
            Args:
                text str: ツイートの文字列。
            Returns:
                str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。
            """
            is_in_archive_template: bool = False
            i: int = 0
            while i < len(text):
                if is_in_archive_template:
                    if text[i:i + 2] == '}}':
                        is_in_archive_template = False
                        i += 2
                else:
                    if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|':
                        is_in_archive_template = True
                        i += 10
                    elif text[i:i + 8] == 'https://':
                        text = text[:i] + \
                            '<nowiki>https://</nowiki>' + text[i + 8:]
                        i += 25
                    elif text[i:i + 7] == 'http://':
                        text = text[:i] + \
                            '<nowiki>http://</nowiki>' + text[i + 7:]
                        i += 24
                i += 1
            return text
        text = text.replace('\n', '<br>\n')
        text = re.sub(r'^ ', '&nbsp;', text, flags=re.MULTILINE)
        text = re.sub(
            r'^([\*#:;])',
            r'<nowiki>\1</nowiki>',
            text,
            flags=re.MULTILINE)
        text = re.sub(
            r'^----',
            '<nowiki>----</nowiki>',
            text,
            flags=re.MULTILINE)
        text = escape_nolink_urls(text)
        return text
    @staticmethod
    def archive_url(
            url: str,
            archived_url: str,
            text: str | None = None) -> str:
        """URLをArchiveテンプレートでラップする。
        Args:
            url str: ラップするURL。
            archive_url str: ラップするURLの魚拓のURL。
            text str | None: ArchiveテンプレートでURLの代わりに表示する文字列。
        Returns:
            str: ArchiveタグでラップしたURL。
        """
        if text is None:
            return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}'
        else:
            return '{{Archive|1=' + \
                unquote(url) + '|2=' + archived_url + '|3=' + text + '}}'
    @staticmethod
    def callinshowlink_url(url: str, archived_url: str) -> str:
        """URLをCallinShowLinkテンプレートでラップする。
        Args:
            url str: ラップするURL。
            archive_url str: ラップするURLの魚拓のURL。
        Returns:
            str: CallinShowLinkタグでラップしたURL。
        """
        return '{{CallinShowLink|1=' + url + '|2=' + archived_url + '}}'




563行目: 773行目:
     """
     """


     LIMIT_N_TWEETS: Final[int] = 10000
     LIMIT_N_TWEETS: Final[int] = 100
     """Final[int]: 取得するツイート数の上限。
     """Final[int]: 取得するツイート数の上限。
     """
     """
573行目: 783行目:
     TWEETS_OR_REPLIES: Final[str] = 'with_replies'
     TWEETS_OR_REPLIES: Final[str] = 'with_replies'
     """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。
     """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。
    Returns:
        dict[str, str] | None: プロキシ設定。
     """
     """


603行目: 810行目:
         """コンストラクタ。
         """コンストラクタ。
         """
         """
         self._check_slash() ##スラッシュが抜けてないかチェック
         self._check_slash() # スラッシュが抜けてないかチェック
        self._txt_data: list[str] = []
        self._limit_count: int = 0 ##記録数


     def _set_queries(self, accessor: AccessorHandler, krsw: bool):
     def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> None:
         """検索条件を設定する。
         """検索条件を設定する。


618行目: 823行目:
         """
         """


         ##ユーザー名取得
         # ユーザー名取得
         if krsw:
         if krsw:
             print('名前は自動的に' + self.CALLINSHOW + 'にナリます')
             logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
            self.name: Final[str] = self.CALLINSHOW
        self._name: str = self.CALLINSHOW if krsw else self._get_name(accessor)
        else:
            self.name: Final[str] = self._get_name(accessor)


         ##検索クエリとページ取得
         # 検索クエリとページ取得
         self.query_strs: list[str] = []
         self.query_strs: list[str] = []
         if krsw:
         if krsw:
             print('クエリは自動的になしにナリます')
             logger.info('クエリは自動的になしにナリます')
         else:
         else:
             self._get_query()
             self._get_query()
         self._page: str | None = accessor.request(urljoin(self.NITTER_INSTANCE, self.name + '/' + self.TWEETS_OR_REPLIES))
         page_optional: str | None = accessor.request(
         if self._page is None:
            urljoin(self.NITTER_INSTANCE, self._name + '/'
                    + self.TWEETS_OR_REPLIES))
         if page_optional is None:
             self._fail()
             self._fail()
        self._page: str = page_optional


         ##終わりにするツイート取得
         # 終わりにするツイート取得
         if krsw:
         if krsw:
             print('終わりにするツイートは自動的になしにナリます')
             logger.info('終わりにするツイートは自動的になしにナリます')
            self._stop: Final[str] = ''
        self._stop: str = '' if krsw else self._stop_word()
         else:
 
             self._stop: Final[str] = self._stop_word()
         # 日付取得
        timeline_item: Tag | NavigableString | None = BeautifulSoup(
             self._page, 'html.parser').find(
            class_='timeline-item')
        assert isinstance(timeline_item, Tag)
        date: datetime = self._tweet_date(timeline_item)


        ##日付取得
         self._table_builder: TableBuilder = TableBuilder(date)
         self._date: datetime = self._tweet_date(BeautifulSoup(self._page, 'html.parser').find(class_='timeline-item'))
        self._txt_data.append('')
        print()


     def _check_slash(self) -> None | NoReturn:
     def _check_slash(self) -> None | NoReturn:
655行目: 863行目:
         Raises:
         Raises:
             RuntimeError: URLの最後にスラッシュがついていない場合に出る。
             RuntimeError: URLの最後にスラッシュがついていない場合に出る。
        Todo:
            できたらこのメソッドなしで動くようにする。
         """
         """
         if self.NITTER_INSTANCE[-1] != '/':
         if self.NITTER_INSTANCE[-1] != '/':
668行目: 873行目:
             raise RuntimeError('TWITTER_URLの末尾には/が必須です')
             raise RuntimeError('TWITTER_URLの末尾には/が必須です')


     def _check_nitter_instance(self, accessor: AccessorHandler) -> None | NoReturn:
     def _check_nitter_instance(
            self, accessor: AccessorHandler) -> None | NoReturn:
         """Nitterのインスタンスが生きているかチェックする。
         """Nitterのインスタンスが生きているかチェックする。


         死んでいたらそこで終了。
         死んでいたらそこで終了。
         接続を一回しか試さない :func:`~_request_once` を使っているのは、激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。
         接続を一回しか試さない :func:`~_request_once` を使っているのは、
        激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。


         Args:
         Args:
680行目: 887行目:
             None | NoReturn: NitterにアクセスできればNone。できなければ終了。
             None | NoReturn: NitterにアクセスできればNone。できなければ終了。
         """
         """
         print("Nitterのインスタンスチェック中ですを")
         logger.info('Nitterのインスタンスチェック中ですを')
         try:
         try:
             accessor.request_once(self.NITTER_INSTANCE)
             accessor.request_once(self.NITTER_INSTANCE)
         except AccessError as e: ##エラー発生時は終了
         except AccessError as e: # エラー発生時は終了
             print(e, file=sys.stderr)
             logger.critical(e)
             print('インスタンスが死んでますを', file=sys.stderr)
             logger.critical('インスタンスが死んでますを')
             exit(1)
             exit(1)
         print("Nitter OK")
         logger.info('Nitter OK')


     def _check_archive_instance(self, accessor: AccessorHandler) -> None | NoReturn:
     def _check_archive_instance(
            self, accessor: AccessorHandler) -> None | NoReturn:
         """archive.todayのTor用インスタンスが生きているかチェックする。
         """archive.todayのTor用インスタンスが生きているかチェックする。


698行目: 906行目:
             None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。
             None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。
         """
         """
         print("archive.todayのTorインスタンスチェック中ですを")
         logger.info('archive.todayのTorインスタンスチェック中ですを')
         try:
         try:
             accessor.request_once(self.ARCHIVE_TODAY)
             accessor.request_once(self.ARCHIVE_TODAY)
         except AccessError as e: ##エラー発生時は終了
         except AccessError as e: # エラー発生時は終了
             print(e, file=sys.stderr)
             logger.critical(e)
             print('インスタンスが死んでますを', file=sys.stderr)
             logger.critical('インスタンスが死んでますを')
             exit(1)
             exit(1)
         print("archive.today OK")
         logger.info('archive.today OK')


     def _invidious_instances(self, accessor: AccessorHandler) -> tuple[str] | NoReturn:
     def _invidious_instances(
            self,
            accessor: AccessorHandler) -> tuple[str, ...] | NoReturn:
         """Invidiousのインスタンスのタプルを取得する。
         """Invidiousのインスタンスのタプルを取得する。


714行目: 924行目:


         Returns:
         Returns:
             tuple[str] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。
             tuple[str, ...] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。
         """
         """
         print("Invidiousのインスタンスリストを取得中ですを")
         logger.info('Invidiousのインスタンスリストを取得中ですを')
         invidious_json: Final[str] | None = accessor.request_with_requests_module('https://api.invidious.io/instances.json')
         invidious_json: Final[str] | None = accessor.request_with_requests_module(
            'https://api.invidious.io/instances.json')
         if invidious_json is None:
         if invidious_json is None:
             print("Invidiousが死んでますを")
             logger.critical('Invidiousが死んでますを')
             exit(1)
             exit(1)
         instance_list: list[str] = []
         instance_list: list[str] = []
744行目: 955行目:
         """
         """
         while True:
         while True:
             print('アカウント名を入れなければない。空白だと自動的に' + self.CALLINSHOW + 'になりますを')
             logger.info(
             account_str: str = input() ##ユーザー入力受付
                'アカウント名を入れなければない。空白だと自動的に'
             ##空欄で降臨ショー
                + self.CALLINSHOW
                + 'になりますを')
             account_str: str = input()
             # 空欄で降臨ショー
             if account_str == '':
             if account_str == '':
                 return self.CALLINSHOW
                 return self.CALLINSHOW
             else:
             else:
                 res: Final[str]| None = accessor.request(urljoin(self.NITTER_INSTANCE, account_str)) ##リクエストして結果取得
                 res: Final[str | None] = accessor.request(
                 if res is None: ##リクエスト失敗判定
                    urljoin(self.NITTER_INSTANCE, account_str))
                 if res is None: # リクエスト失敗判定
                     self._fail()
                     self._fail()
                 soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析
                 soup: BeautifulSoup = BeautifulSoup(res, 'html.parser')
                 if soup.title == self.NITTER_ERROR_TITLE: ##タイトルがエラーでないか判定
                 if soup.title == self.NITTER_ERROR_TITLE:
                     print(account_str + "は実在の人物ではありませんでした") ##エラー時ループに戻る
                     logger.warning(account_str + 'は実在の人物ではありませんでした')
                 else:
                 else:
                     print("最終的に出会ったのが@" + account_str + "だった。")
                     logger.info('最終的に出会ったのが@' + account_str + 'だった。')
                     return account_str ##成功時アカウント名返す
                     return account_str


     def _get_query(self) -> None:
     def _get_query(self) -> None:
765行目: 980行目:
         取得したクエリは ``self.query_strs`` に加えられる。
         取得したクエリは ``self.query_strs`` に加えられる。
         """
         """
         print("検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。")
         logger.info('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。')
         print("例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行")
         logger.info('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行')
         query_input: str = input() ##ユーザー入力受付
         query_input: str = input()
         ##空欄が押されるまでユーザー入力受付
         # 空欄が押されるまでユーザー入力受付
         while query_input != '':
         while query_input != '':
             self.query_strs.append(query_input)
             self.query_strs.append(query_input)
             query_input = input()
             query_input = input()
         print("クエリのピースが埋まっていく。")
         logger.info('クエリのピースが埋まっていく。')


     def _fail(self) -> NoReturn:
     def _fail(self) -> NoReturn:
779行目: 994行目:
         取得に成功した分だけファイルにダンプし、プログラムを終了する。
         取得に成功した分だけファイルにダンプし、プログラムを終了する。
         """
         """
         print("接続失敗しすぎで強制終了ナリ")
         logger.critical('接続失敗しすぎで強制終了ナリ')
         if len(self._txt_data) > 0: ##取得成功したデータがあれば発行
         if self._table_builder.count > 0: # 取得成功したデータがあれば発行
             print("取得成功した分だけ発行しますを")
             logger.critical('取得成功した分だけ発行しますを')
            self._make_txt()
            self._table_builder.dump_file()
        else:
         exit(1)
            exit(1) ##終了
 
    def _convert_to_text_table(self, text: str) -> str:
        """``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。
 
        Args:
            text str: ヘッダとフッタがないWikiテーブル。
 
        Returns:
            str: テーブル表示用のヘッダとフッタがついたWikiテーブル。
        """
        return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'
 
    def _make_txt(self) -> NoReturn:
        """Wikiテーブルをファイル出力し、プログラムを終了する。
        """
        self._next_day()
        result_txt: Final[str] = '\n'.join(self._txt_data) ##リストを合体
        ##ファイル出力
        with codecs.open('tweet.txt', 'w', 'utf-8') as f:
            f.write(result_txt)
        print("テキストファイル手に入ったやで〜")
         exit(0) ##終了


     def _stop_word(self) -> str:
     def _stop_word(self) -> str:
814行目: 1,006行目:
             str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。
             str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。
         """
         """
         print(f"ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。")
         logger.info(
         end_str: Final[str] = input() ##ユーザー入力受付
            f'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。')
         end_str: Final[str] = input()
         return end_str
         return end_str


     def _download_media(self, media_name: Final[str], accessor: AccessorHandler) -> bool:
     def _download_media(
            self,
            media_name: str,
            accessor: AccessorHandler) -> bool:
         """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。
         """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。


         Args:
         Args:
             media_name Final[str]: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。
             media_name str: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ


832行目: 1,028行目:
         image_bytes: Final[bytes | None] = accessor.request_image(url)
         image_bytes: Final[bytes | None] = accessor.request_image(url)
         if image_bytes is not None:
         if image_bytes is not None:
             with open(os.path.join(self.MEDIA_DIR, media_name), "wb") as f:
             with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f:
                 f.write(image_bytes)
                 f.write(image_bytes)
             return True
             return True
838行目: 1,034行目:
             return False
             return False


     def _tweet_date(self, tweet: bs4.element.Tag) -> datetime:
     def _tweet_date(self, tweet: Tag) -> datetime:
         """ツイートの時刻を取得する。
         """ツイートの時刻を取得する。


         Args:
         Args:
             tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。


         Returns:
         Returns:
             datetime: ツイートの時刻。
             datetime: ツイートの時刻。
         """
         """
         date_str: str = tweet.find(class_='tweet-date').a['title']
         tweet_date: Tag | NavigableString | None = tweet.find(
         date: datetime = datetime.strptime(date_str, '%b %d, %Y · %I:%M %p %Z').replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('Asia/Tokyo'))
            class_='tweet-date')
        assert isinstance(tweet_date, Tag)
        tweet_date_a: Tag | None = tweet_date.a
        assert tweet_date_a is not None
        date_str: str | list[str] | None = tweet_date_a.get('title')
        assert isinstance(date_str, str)
         date: datetime = datetime.strptime(
            date_str,
            '%b %d, %Y · %I:%M %p %Z').replace(
            tzinfo=ZoneInfo('UTC')).astimezone(
            ZoneInfo('Asia/Tokyo'))
         return date
         return date


    #self._dateの日付のツイートがなくなったときの処理
     def _get_tweet_media(
     def _next_day(self, date: datetime|None = None) -> None:
             self,
        """1日分のツイートをテーブル形式に変換し、その日のツイートを記録し終わったことを通知して、``self._txt_data`` の0番目に空文字列を追加する。
             tweet: Tag,
 
            accessor: AccessorHandler) -> str:
        Args:
            date datetime|None:
                記録した日付の前日の日付。Noneでなければ、``self._date`` をその値に更新する。
        """
        if self._txt_data[0]: # 空でなければ出力
            self._txt_data[0] = self._convert_to_text_table(self._txt_data[0])
            if os.name == 'nt': # Windows
                self._txt_data[0] = self._date.strftime('\n=== %#m月%#d日 ===\n') + self._txt_data[0]
                print(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
            else: # Mac or Linux
                self._txt_data[0] = self._date.strftime('\n=== %-m月%-d日 ===\n') + self._txt_data[0]
                print(self._date.strftime('%-m月%-d日のツイートを取得完了ですを'))
        if date is not None:
             self._txt_data.insert(0, '')
             self._date = date
 
    def _get_tweet_media(self, tweet: bs4.element.Tag, accessor: AccessorHandler) -> str:
         """ツイートの画像や動画を取得する。
         """ツイートの画像や動画を取得する。


         Args:
         Args:
             tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ


881行目: 1,070行目:
             str: Wiki記法でのファイルへのリンクの文字列。
             str: Wiki記法でのファイルへのリンクの文字列。
         """
         """
         tweet_media: bs4.element.Tag | None = tweet.select_one('.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択
         tweet_media: Tag | None = tweet.select_one(
            '.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択
         media_txt: str = ''
         media_txt: str = ''
         if tweet_media is not None:
         if tweet_media is not None:
888行目: 1,078行目:
             for image_a in tweet_media.select('.attachment.image a'):
             for image_a in tweet_media.select('.attachment.image a'):
                 try:
                 try:
                     media_name: str = [group for group in re.search(r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', image_a.get('href')).groups() if group][0]
                     media_name: str = [
                     media_list.append(f"[[ファイル:{media_name}|240px]]")
                        group for group in re.search(
                            r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)',
                            image_a.get('href')).groups() if group][0]
                     media_list.append(f'[[ファイル:{media_name}|240px]]')
                     if self._download_media(media_name, accessor):
                     if self._download_media(media_name, accessor):
                         print(os.path.join(self.MEDIA_DIR, media_name) + ' をアップロードしなければない。')
                         logger.info(
                            os.path.join(
                                self.MEDIA_DIR,
                                media_name)
                            + ' をアップロードしなければない。')
                     else:
                     else:
                         print(urljoin('https://pbs.twimg.com/media/', media_name) + ' をアップロードしなければない。')
                         logger.info(
                            urljoin(
                                'https://pbs.twimg.com/media/',
                                media_name)
                            + ' をアップロードしなければない。')
                 except AttributeError:
                 except AttributeError:
                     tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成
                     tweet_url: str = urljoin(
                     print(f"{tweet_url}の画像が取得できませんでしたを 当職無能")
                        self.TWITTER_URL,
                     media_list.append(f"[[ファイル:(画像の取得ができませんでした)|240px]]")
                        re.sub(
                            '#[^#]*$',
                            '',
                            tweet.find(
                                class_='tweet-link').get('href')))
                     logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
                     media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
             # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
             # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
             for i, video_container in enumerate(tweet_media.select('.attachment.video-container')):
             for i, video_container in enumerate(
                 tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成
                    tweet_media.select('.attachment.video-container')):
                 tweet_url: str = urljoin(
                    self.TWITTER_URL,
                    re.sub(
                        '#[^#]*$',
                        '',
                        tweet.find(
                            class_='tweet-link').get('href'))) # ツイートのURL作成
                 video = video_container.select_one('video')
                 video = video_container.select_one('video')
                 if video is None:
                 if video is None:
                     print(f"{tweet_url}の動画が取得できませんでしたを 当職無能")
                     logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                     media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]")
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                     continue
                     continue


                 if subprocess.run(['which', 'ffmpeg'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0:
                 if subprocess.run(['which',
                     print(f"ffmpegがないため{tweet_url}の動画が取得できませんでしたを")
                                  'ffmpeg'],
                     media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]")
                                  stdout=subprocess.DEVNULL,
                 else: # ffmpegがある場合
                                  stderr=subprocess.DEVNULL).returncode != 0:
                     media_url: str = unquote(re.search(r'[^\/]+$', video.get('data-url')).group(0))
                     logger.error(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
                     media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
                 else: # ffmpegがある場合
                    # TODO: ブロックが大きすぎるので別メソッドに切り出す
                     media_url: str = unquote(
                        re.search(
                            r'[^\/]+$',
                            video.get('data-url')).group(0))
                     tweet_id: str = tweet_url.split('/')[-1]
                     tweet_id: str = tweet_url.split('/')[-1]
                     # 動画のダウンロード
                     # 動画のダウンロード
                     if accessor.proxies is not None:
                     if accessor.proxies is not None:
                         returncode: int = subprocess.run(["ffmpeg", "-y", "-http_proxy", "accessor.proxies['http']", "-i", urljoin(self.NITTER_INSTANCE, media_url), "-c", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts"], stdout=subprocess.DEVNULL).returncode
                         returncode: int = subprocess.run(
                            [
                                'ffmpeg', '-y', '-http_proxy',
                                'accessor.proxies["http"]', '-i',
                                urljoin(self.NITTER_INSTANCE, media_url),
                                '-c', 'copy',
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
                            ],
                            stdout=subprocess.DEVNULL).returncode
                     else:
                     else:
                         returncode: int = subprocess.run(["ffmpeg", "-y", "-i", urljoin(self.NITTER_INSTANCE, media_url), "-c", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts"], stdout=subprocess.DEVNULL).returncode
                         returncode: int = subprocess.run(
                            [
                                'ffmpeg', '-y', '-i',
                                urljoin(self.NITTER_INSTANCE, media_url),
                                '-c', 'copy',
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
                            ],
                            stdout=subprocess.DEVNULL).returncode
                     # 取得成功したらtsをmp4に変換
                     # 取得成功したらtsをmp4に変換
                     if returncode == 0:
                     if returncode == 0:
                         ts2mp4_returncode: int = subprocess.run(["ffmpeg", "-y", "-i", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts", "-acodec", "copy", "-vcodec", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4"], stdout=subprocess.DEVNULL).returncode
                         ts2mp4_returncode: int = subprocess.run(
                            [
                                'ffmpeg', '-y', '-i',
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts',
                                '-acodec', 'copy', '-vcodec', 'copy',
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4'
                            ],
                            stdout=subprocess.DEVNULL).returncode
                         if ts2mp4_returncode == 0:
                         if ts2mp4_returncode == 0:
                             print(f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。")
                             logger.info(
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。')
                         else:
                         else:
                             print(f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。")
                             logger.info(
                         media_list.append(f"[[ファイル:{tweet_id}_{i}.mp4|240px]]")
                                f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。')
                         media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
                     else:
                     else:
                         print(f"{tweet_url}の動画が取得できませんでしたを 当職無能")
                         logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
                         media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]")
                         media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
             media_txt = ' '.join(media_list)
             media_txt = ' '.join(media_list)
         return media_txt
         return media_txt


     def _get_tweet_quote(self, tweet: bs4.element.Tag, accessor: AccessorHandler) -> str:
     def _get_tweet_quote(
            self,
            tweet: Tag,
            accessor: AccessorHandler) -> str:
         """引用リツイートの引用元へのリンクを取得する。
         """引用リツイートの引用元へのリンクを取得する。


         Args:
         Args:
             tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ


942行目: 1,190行目:
             str: Archiveテンプレートでラップされた引用元ツイートへのリンク。
             str: Archiveテンプレートでラップされた引用元ツイートへのリンク。
         """
         """
         tweet_quote: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .quote.quote-big') # 引用リツイートを選択
         tweet_quote: Final[Tag | None] = tweet.select_one(
            '.tweet-body > .quote.quote-big') # 引用リツイートを選択
         quote_txt: str = ''
         quote_txt: str = ''
         if tweet_quote is not None:
         if tweet_quote is not None:
949行目: 1,198行目:
             link = urljoin(self.TWITTER_URL, link)
             link = urljoin(self.TWITTER_URL, link)
             quote_txt = self._archive_url(link, accessor)
             quote_txt = self._archive_url(link, accessor)
         tweet_quote_unavailable: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .quote.unavailable') # 引用リツイートを選択
         tweet_quote_unavailable: Final[Tag | None] = tweet.select_one(
            '.tweet-body > .quote.unavailable') # 引用リツイートを選択
         if tweet_quote_unavailable is not None:
         if tweet_quote_unavailable is not None:
             quote_txt = '(引用元が削除されました)'
             quote_txt = '(引用元が削除されました)'
         return quote_txt
         return quote_txt


     def _get_tweet_poll(self, tweet: bs4.element.Tag) -> str:
     def _get_tweet_poll(self, tweet: Tag) -> str:
         """ツイートの投票結果を取得する。
         """ツイートの投票結果を取得する。


         Args:
         Args:
             tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
             tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。


         Returns:
         Returns:
             str: Wiki形式に書き直した投票結果。
             str: Wiki形式に書き直した投票結果。
         """
         """
         tweet_poll: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .poll')
         tweet_poll: Final[Tag | None] = tweet.select_one(
            '.tweet-body > .poll')
         poll_txt: str = ''
         poll_txt: str = ''
         if tweet_poll is not None:
         if tweet_poll is not None:
             poll_meters: Final[bs4.element.ResultSet] = tweet_poll.select('.poll-meter')
             poll_meters: Final[ResultSet[Tag]] = tweet_poll.select(
                '.poll-meter')
             for poll_meter in poll_meters:
             for poll_meter in poll_meters:
                 ratio: str = poll_meter.select_one('.poll-choice-value').text
                 ratio: str = poll_meter.select_one('.poll-choice-value').text
                 if 'leader' in poll_meter['class']:
                 if 'leader' in poll_meter['class']:
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>'
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one(
                        '.poll-choice-option').text + '</span>'
                 else:
                 else:
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>'
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one(
             poll_txt += '<br>\n&nbsp; <span style="font-size: small;">' + tweet_poll.select_one('.poll-info').text + '</span>'
                        '.poll-choice-option').text + '</span>'
             poll_txt += '<br>\n&nbsp; <span style="font-size: small;">' + \
                tweet_poll.select_one('.poll-info').text + '</span>'
         return poll_txt
         return poll_txt


     def _get_timeline_items(self, soup: BeautifulSoup) -> list[bs4.element.Tag]:
     def _get_timeline_items(self,
                            soup: BeautifulSoup) -> list[Tag]:
         """タイムラインのツイートを取得。
         """タイムラインのツイートを取得。


985行目: 1,241行目:


         Returns:
         Returns:
             list[bs4.element.Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すbs4.element.Tagオブジェクトのリスト。
             list[Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すTagオブジェクトのリスト。
         """
         """
         timeline_item_list: list[bs4.element.Tag] = []
         timeline_item_list: list[Tag] = []
         for item_or_list in soup.select('.timeline > .timeline-item, .timeline > .thread-line'):
         for item_or_list in soup.select(
                '.timeline > .timeline-item, .timeline > .thread-line'):
             if 'unavailable' in item_or_list.attrs['class']:
             if 'unavailable' in item_or_list.attrs['class']:
                 continue
                 continue
1,007行目: 1,264行目:
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ
         """
         """
         soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') ##beautifulsoupでレスポンス解析
         soup: Final[BeautifulSoup] = BeautifulSoup(
         tweets: Final[list[bs4.element.Tag]] = self._get_timeline_items(soup) ##一ツイートのブロックごとにリストで取得
            self._page, 'html.parser') # beautifulsoupでレスポンス解析
         for tweet in tweets: ##一ツイート毎に処理
         tweets: Final[list[Tag]] = self._get_timeline_items(
             if tweet.a.text == self.NEWEST: ##Load Newestのボタンは処理しない
            soup) # 一ツイートのブロックごとにリストで取得
         for tweet in tweets: # 一ツイート毎に処理
             if tweet.a.text == self.NEWEST:
                # Load Newestのボタンは処理しない
                 continue
                 continue
             if tweet.find(class_='retweet-header') is not None: ##retweet-headerはリツイートを示すので入っていれば処理しない
             if tweet.find(class_='retweet-header') is not None:
                # retweet-headerはリツイートを示すので入っていれば処理しない
                 continue
                 continue
             if tweet.find(class_='pinned') is not None: ##pinnedは固定ツイートを示すので入っていれば処理しない
             if tweet.find(class_='pinned') is not None:
                # pinnedは固定ツイートを示すので入っていれば処理しない
                 continue
                 continue
             if len(self.query_strs) > 0: # クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト
             if len(self.query_strs) > 0:
                # クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト
                 not_match: bool = False
                 not_match: bool = False
                 for query_str in query_strs:
                 for query_str in self.query_strs:
                     if query_str not in tweet.text:
                     if query_str not in tweet.text:
                         not_match = True
                         not_match = True
1,025行目: 1,288行目:
                     continue
                     continue


             tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成
             tweet_url: str = urljoin(
                self.TWITTER_URL,
                re.sub(
                    '#[^#]*$',
                    '',
                    tweet.find(
                        class_='tweet-link').get('href')))
             date: datetime = self._tweet_date(tweet)
             date: datetime = self._tweet_date(tweet)
             if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day:
             self._table_builder.next_day_if_necessary(date)
                self._next_day(date)
             archived_tweet_url: str = self._callinshowlink_url(
             archived_tweet_url: str = self._callinshowlink_url(tweet_url, accessor) ##ツイートURLをテンプレートCallinShowlinkに変化
                tweet_url, accessor)
             tweet_content: bs4.element.Tag = tweet.find(class_='tweet-content media-body') ##ツイートの中身だけ取り出す
             tweet_content: Tag = tweet.find(
             self._archive_soup(tweet_content, accessor) ##ツイートの中身のリンクをテンプレートArchiveに変化
                class_='tweet-content media-body')
             media_txt: str = self._get_tweet_media(tweet, accessor) ##ツイートに画像などのメディアを追加
             self._archive_soup(tweet_content, accessor)
             quote_txt: str = self._get_tweet_quote(tweet, accessor) ##引用リツイートの場合、元ツイートを追加
             media_txt: str = self._get_tweet_media(tweet, accessor)
             poll_txt: str = self._get_tweet_poll(tweet) ##投票の取得
             quote_txt: str = self._get_tweet_quote(tweet, accessor)
             self._txt_data[0] = '!' + archived_tweet_url + '\n|-\n|\n' \
             poll_txt: str = self._get_tweet_poll(tweet)
                 + '<br>\n'.join(filter(None, [
             self._table_builder.append(
                    self._escape_wiki_reserved_words(tweet_content.get_text()),
                 archived_tweet_url, '<br>\n'.join(
                    quote_txt,
                    filter(
                    media_txt,
                        None, [
                    poll_txt
                            self._table_builder.escape_wiki_reserved_words(
                ])) \
                                tweet_content.get_text()), quote_txt, media_txt, poll_txt])))
                + '\n|-\n' \
                + self._txt_data[0] ##wikiの文法に変化
            self._limit_count += 1 ##記録回数をカウント
            if self._limit_count % self.REPORT_INTERVAL == 0:
                print(f"ツイートを{self._limit_count}件も記録したンゴwwwwwwwwwww")
            if self._stop != '' and self._stop in tweet_content.get_text(): ##目的ツイートか判定
                print("目的ツイート発見でもう尾張屋根")
                self._make_txt()
            if self._limit_count >= self.LIMIT_N_TWEETS: ##上限達成か判定
                print(f"{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。")
                self._make_txt()
 
    def _escape_wiki_reserved_words(self, text: str) -> str:
        """MediaWikiの文法と衝突する文字を無効化する。
 
        Args:
            text str: ツイートの文字列。
 
        Returns:
            str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。
        """
        def escape_nolink_urls(text: str) -> str:
            """Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。
 
            Args:
                text str: ツイートの文字列。
 
            Returns:
                str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。
            """
            is_in_archive_template: bool = False
            i: int = 0
            while i < len(text):
                if is_in_archive_template:
                    if text[i:i+2] == '}}':
                        is_in_archive_template = False
                        i += 2
                else:
                    if text[i:i+10] == '{{Archive|' or text[i:i+10] == '{{archive|':
                        is_in_archive_template = True
                        i += 10
                    elif text[i:i+8] == 'https://':
                        text = text[:i] + '<nowiki>https://</nowiki>' + text[i+8:]
                        i += 25
                    elif text[i:i+7] == 'http://':
                        text = text[:i] + '<nowiki>http://</nowiki>' + text[i+7:]
                        i += 24
                i += 1
            return text


        text = text.replace('\n', '<br>\n')
            if self._table_builder.count % self.REPORT_INTERVAL == 0:
        text = re.sub(r'^ ', '&nbsp;', text, flags=re.MULTILINE)
                logger.info(
        text = re.sub(r'^([\*#:;])', r'<nowiki>\1</nowiki>', text, flags=re.MULTILINE)
                    f'ツイートを{self._table_builder.count}件も記録したンゴwwwwwwwwwww')
        text = re.sub(r'^----', '<nowiki>----</nowiki>', text, flags=re.MULTILINE)
            if self._stop != '' and self._stop in tweet_content.get_text():  # 目的ツイートか判定
        text = escape_nolink_urls(text)
                logger.info('目的ツイート発見でもう尾張屋根')
        return text
                self._table_builder.dump_file()
            if self._table_builder.count >= self.LIMIT_N_TWEETS:
                logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。')
                self._table_builder.dump_file()


     def _archive_soup(self, tag: bs4.element.Tag, accessor: AccessorHandler) -> None:
     def _archive_soup(
            self,
            tag: Tag,
            accessor: AccessorHandler) -> None:
         """ツイート内のaタグをテンプレートArchiveの文字列に変化させる。
         """ツイート内のaタグをテンプレートArchiveの文字列に変化させる。


1,105行目: 1,331行目:


         Args:
         Args:
             tag bs4.element.Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。
             tag Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ
         """
         """
         urls_in_tweet: Final[bs4.element.ResultSet] = tag.find_all('a')
         urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all(
            'a')
         for url in urls_in_tweet:
         for url in urls_in_tweet:
             if url.get('href').startswith('https://') or url.get('href').startswith('http://'): ##先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない
             href: str | list[str] | None = url.get('href')
                 if url.get('href').startswith('https' + self.NITTER_INSTANCE[4:]):
            assert isinstance(href, str)
                     #Nitter上のTwitterへのリンクを直す
 
                     url_link: str = url.get('href').replace('https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
            if href.startswith('https://') or href.startswith('http://'):
                     url_link = re.sub('\?.*$', '', url_link)
                # 先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない
                     url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化
                 if href.startswith('https' + self.NITTER_INSTANCE[4:]):
                 elif url.get('href').startswith('https://nitter.kavin.rocks/'):
                     # Nitter上のTwitterへのリンクを直す
                     #Nitter上のTwitterへのリンクを直す
                     url_link: str = href.replace(
                     url_link: str = url.get('href').replace('https://nitter.kavin.rocks/', self.TWITTER_URL)
                        'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
                     url_link = re.sub('\?.*$', '', url_link)
                     url_link = re.sub('\\?.*$', '', url_link)
                     url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化
                     url.replace_with(self._archive_url(url_link, accessor))
                 elif self._invidious_pattern.search(url.get('href')):
                 elif href.startswith('https://nitter.kavin.rocks/'):
                     #Nitter上のYouTubeへのリンクをInvidiousのものから直す
                     # Nitter上のTwitterへのリンクを直す
                    url_link: str = url.get('href')
                     url_link: str = href.replace(
                     if re.match('https://[^/]+/[^/]+/', url_link) or re.search('/@[^/]*$', url_link):
                        'https://nitter.kavin.rocks/', self.TWITTER_URL)
                         url_link = self._invidious_pattern.sub('youtube.com', url_link)
                     url_link = re.sub('\\?.*$', '', url_link)
                     url.replace_with(self._archive_url(url_link, accessor))
                 elif self._invidious_pattern.search(href):
                     # Nitter上のYouTubeへのリンクをInvidiousのものから直す
                     if re.match(
                            'https://[^/]+/[^/]+/',
                            href) or re.search(
                            '/@[^/]*$',
                            href):
                         href = self._invidious_pattern.sub('youtube.com', href)
                     else:
                     else:
                         url_link = self._invidious_pattern.sub('youtu.be', url_link)
                         href = self._invidious_pattern.sub('youtu.be', href)
                     url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化
                     url.replace_with(self._archive_url(href, accessor))
                 elif url.get('href').startswith('https://bibliogram.art/'):
                 elif href.startswith('https://bibliogram.art/'):
                     # Nitter上のInstagramへのリンクをBibliogramのものから直す
                     # Nitter上のInstagramへのリンクをBibliogramのものから直す
                     # Bibliogramは中止されたようなのでそのうちリンクが変わるかも
                     # Bibliogramは中止されたようなのでそのうちリンクが変わるかも
                     url_link: str = url.get('href').replace('https://bibliogram.art/', 'https://www.instagram.com/')
                     url_link: str = href.replace(
                     url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化
                        'https://bibliogram.art/',
                        'https://www.instagram.com/')
                     url.replace_with(self._archive_url(url_link, accessor))
                 else:
                 else:
                     url.replace_with(self._archive_url(url.get('href'), accessor)) ##テンプレートArchiveに変化
                     url.replace_with(self._archive_url(href, accessor))
             elif url.text.startswith('@'):
             elif url.text.startswith('@'):
                 url_link: str = urljoin(self.TWITTER_URL, url.get('href'))
                 url_link: str = urljoin(self.TWITTER_URL, href)
                 url_text: str = url.text
                 url_text: str = url.text
                 url.replace_with(self._archive_url(url_link, accessor, url_text)) ##テンプレートArchiveに変化
                 url.replace_with(
                    self._archive_url(
                        url_link, accessor, url_text))


     def _archive_url(self, url: Final[str], accessor: AccessorHandler, text: Final[str|None] = None) -> str:
     def _archive_url(
            self,
            url: str,
            accessor: AccessorHandler,
            text: str | None = None) -> str:
         """URLをArchiveテンプレートでラップする。
         """URLをArchiveテンプレートでラップする。


1,147行目: 1,391行目:


         Args:
         Args:
             url Final[str]: ラップするURL。
             url str: ラップするURL。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ。
             text Final[str|None]: ArchiveテンプレートでURLの代わりに表示する文字列。
             text str|None: ArchiveテンプレートでURLの代わりに表示する文字列。


         Returns:
         Returns:
             str: ArchiveタグでラップしたURL。
             str: ArchiveタグでラップしたURL。
         """
         """
         if '#' in url: # フラグメント識別子の処理
         if '#' in url: # フラグメント識別子の処理
             main_url, fragment = url.split('#', maxsplit=1)
             main_url, fragment = url.split('#', maxsplit=1)
             if text is None:
             return self._table_builder.archive_url(
                return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(main_url, accessor) + '#' + fragment + '}}' ##テンプレートArchiveの文字列返す
                 url, self._archive(main_url, accessor) + '#' + fragment, text)
            else:
                 return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(main_url, accessor) + '#' + fragment + + '|3=' + text + '}}' ##テンプレートArchiveの文字列返す
         else:
         else:
             if text is None:
             return self._table_builder.archive_url(
                return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(url, accessor) + '}}' ##テンプレートArchiveの文字列返す
                 url, self._archive(url, accessor), text)
            else:
                 return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(url, accessor) + '|3=' + text + '}}' ##テンプレートArchiveの文字列返す


     def _callinshowlink_url(self, url: Final[str], accessor: AccessorHandler) -> str:
     def _callinshowlink_url(self, url: str, accessor: AccessorHandler) -> str:
         """URLをCallinShowLinkテンプレートでラップする。
         """URLをCallinShowLinkテンプレートでラップする。


         Args:
         Args:
             url Final[str]: ラップするURL。
             url str: ラップするURL。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ。


         Returns:
         Returns:
             str: CallinShowLinkタグでラップしたURL。
             str: CallinShowLinkタグでラップしたURL。
         """
         """
         return '{{CallinShowLink|1=' + url + '|2=' + self._archive(url, accessor) + '}}'
         return self._table_builder.callinshowlink_url(
            url, self._archive(url, accessor))


     def _archive(self, url: Final[str], accessor: AccessorHandler) -> str:
     def _archive(self, url: str, accessor: AccessorHandler) -> str:
         """URLから対応するarchive.todayのURLを返す。
         """URLから対応するarchive.todayのURLを返す。


1,186行目: 1,427行目:


         Args:
         Args:
             url Final[str]: 魚拓を取得するURL。
             url str: 魚拓を取得するURL。
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ


1,192行目: 1,433行目:
             str: 魚拓のURL。
             str: 魚拓のURL。
         """
         """
         archive_url: str = urljoin(self.ARCHIVE_TODAY_STANDARD, quote(unquote(url), safe='&=+?%')) ##wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される
         archive_url: str = urljoin(
         res: Final[str | None] = accessor.request(urljoin(self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) ##アクセス用URL使って結果を取得
            self.ARCHIVE_TODAY_STANDARD,
         if res is None: ##魚拓接続失敗時処理
            quote(
             print(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。')
                unquote(url),
                safe='&=+?%')) # wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される
         res: Final[str | None] = accessor.request(urljoin(
            self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%')))
         if res is None: # 魚拓接続失敗時処理
             logger.error(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。')
         else:
         else:
             soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析
             soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
             content: bs4.element.Tag = soup.find(id="CONTENT") ##archive.todayの魚拓一覧ページの中身だけ取得
             content: Tag = soup.find(
             if content is None or content.get_text()[:len(self.NO_ARCHIVE)] == self.NO_ARCHIVE: ##魚拓があるかないか判定
                id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得
                 print(url + "の魚拓がない。これはいけない。")
             if content is None or content.get_text()[:len(
                    self.NO_ARCHIVE)] == self.NO_ARCHIVE: # 魚拓があるかないか判定
                 logger.warning(url + 'の魚拓がない。これはいけない。')
             else:
             else:
                 archive_url = content.find('a').get('href').replace(self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
                 archive_url = content.find('a').get('href').replace(
                    self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
         return archive_url
         return archive_url


1,213行目: 1,462行目:
             accessor AccessorHandler: アクセスハンドラ
             accessor AccessorHandler: アクセスハンドラ
         """
         """
         soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') ##beautifulsoupでレスポンス解析
         soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser')
         show_mores: Final[bs4.element.ResultSet] = soup.find_all(class_="show-more")
         show_mores: Final[ResultSet[Tag]
         new_url: str = '' # ここで定義しないと動かなくなった、FIXME?
                          ] = soup.find_all(class_='show-more')
         for show_more in show_mores: ##show-moreに次ページへのリンクか前ページへのリンクがある
         new_url: str = '' # ここで定義しないと動かなくなった、FIXME?
             if show_more.text != self.NEWEST:  ##前ページへのリンクではないか判定
         for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある
                 new_url = urljoin(self.NITTER_INSTANCE, self.CALLINSHOW + '/' + self.TWEETS_OR_REPLIES + show_more.a.get('href')) ##直下のaタグのhrefの中身取ってURL頭部分と合体
             if show_more.text != self.NEWEST:  # 前ページへのリンクではないか判定
         res: Final[str | None] = accessor.request(new_url) ##接続してHTML取ってくる
                 new_url = urljoin(
                    self.NITTER_INSTANCE,
                    self._name
                    + '/'
                    + self.TWEETS_OR_REPLIES
                    + show_more.a.get('href')) # 直下のaタグのhrefの中身取ってURL頭部分と合体
         res: Final[str | None] = accessor.request(new_url)
         if res is None:
         if res is None:
             self._fail()
             self._fail()
         new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析
         new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
         if new_page_soup.find(class_="timeline-end") is None: ##ツイートの終端ではtimeline-endだけのページになるので判定
         if new_page_soup.find(
             print(new_url + 'に移動しますを')
                class_='timeline-end') is None:
             self._page = res ##まだ残りツイートがあるのでページを返して再度ツイート本文収集
            # ツイートの終端ではtimeline-endだけのページになるので判定
             logger.info(new_url + 'に移動しますを')
             self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集
         else:
         else:
             print("急に残りツイートが無くなったな終了するか")
             logger.info('急に残りツイートが無くなったな終了するか')
             self._make_txt()
             self._table_builder.dump_file()


     def execute(self, krsw: bool=False, use_browser: bool=True, enable_javascript: bool=True) -> NoReturn:
     def execute(self, krsw: bool = False, use_browser: bool = True,
                enable_javascript: bool = True) -> NoReturn:
         """通信が必要な部分のロジック。
         """通信が必要な部分のロジック。


1,243行目: 1,501行目:
             self._check_nitter_instance(accessor)
             self._check_nitter_instance(accessor)
             self._check_archive_instance(accessor)
             self._check_archive_instance(accessor)
             ##Invidiousのインスタンスリストの正規表現パターンを取得
             # Invidiousのインスタンスリストの正規表現パターンを取得
             invidious_url_tuple: Final[tuple[str]] = self._invidious_instances(accessor)
             invidious_url_tuple: Final[tuple[str, ...]
             self._invidious_pattern: Final[re.Pattern] = re.compile('|'.join(invidious_url_tuple))
                                      ] = self._invidious_instances(accessor)
             self._invidious_pattern: re.Pattern[str] = re.compile(
                '|'.join(invidious_url_tuple))


             # 検索クエリの設定
             # 検索クエリの設定
1,257行目: 1,517行目:


if __name__ == '__main__':
if __name__ == '__main__':
     if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 10):
     if sys.version_info.major < 3 or (
         print('Pythonのバージョンを3.10以上に上げて下さい', file=sys.stderr)
            sys.version_info.major == 3 and sys.version_info.minor < 11):
         logger.critical('Pythonのバージョンを3.11以上に上げて下さい')
         exit(1)
         exit(1)
     parser: ArgumentParser = ArgumentParser()
     parser: ArgumentParser = ArgumentParser()
     parser.add_argument("--krsw", action='store_true', help="指定すると、パカデブのツイートを取得上限数まで取得する。")
     parser.add_argument(
     parser.add_argument("-n", "--no_use_browser", action='store_true', help="指定すると、Tor Browserを利用しない。")
        '--krsw',
     parser.add_argument("-d", "--disable_script", action='store_true', help="指定すると、Tor BrowserでJavaScriptを利用しない。")
        action='store_true',
        help='指定すると、パカデブのツイートを取得上限数まで取得する。')
     parser.add_argument(
        '-n',
        '--no_browser',
        action='store_true',
        help='指定すると、Tor Browserを利用しない。')
     parser.add_argument(
        '-d',
        '--disable_script',
        action='store_true',
        help='指定すると、Tor BrowserでJavaScriptを利用しない。')
     args: Namespace = parser.parse_args()
     args: Namespace = parser.parse_args()


     twitter_archiver: TwitterArchiver = TwitterArchiver()
     twitter_archiver: TwitterArchiver = TwitterArchiver()
     twitter_archiver.execute(args.krsw, not args.no_use_browser, not args.disable_script)
     twitter_archiver.execute(
        args.krsw,
        not args.no_browser,
        not args.disable_script)
‎</syntaxhighlight>
‎</syntaxhighlight>


匿名利用者