91
回編集
>Fet-Fe (→コード: v4.3.2 URL一覧に重複が出ないよう修正) |
(→コード: v4.4.2 WikiのURLをkrsw-wiki.inに変更) |
||
(他の1人の利用者による、間の8版が非表示) | |||
11行目: | 11行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4. | ver4.4.2 2024/11/9恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。 | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。 | ||
19行目: | 19行目: | ||
Examples: | Examples: | ||
定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。 | 定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。 | ||
:: | :: | ||
24行目: | 25行目: | ||
オプションに ``--krsw`` とつけると自動モードになります。 | オプションに ``--krsw`` とつけると自動モードになります。 | ||
:: | :: | ||
33行目: | 35行目: | ||
``--no-browser`` オプションでTor Browserを使用しないモードに、 | ``--no-browser`` オプションでTor Browserを使用しないモードに、 | ||
``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ||
``--search-unarchived`` | ``--search-unarchived`` オプションでは、従来の通りNitterからツイートを収集するモードになります (廃止予定)。 | ||
Note: | Note: | ||
49行目: | 51行目: | ||
* requests, typing_extensionsも環境によっては入っていないかもしれない | * requests, typing_extensionsも環境によっては入っていないかもしれない | ||
.. code-block:: bash | |||
$ python3 -m pip install bs4 requests PySocks selenium typing_extensions | |||
* pipも入っていなければ ``$ sudo apt install pip`` | * pipも入っていなければ ``$ sudo apt install pip`` | ||
* `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます | * `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます | ||
* バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \ | * バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \ | ||
<https://krsw-wiki. | <https://krsw-wiki.in/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_ | ||
""" | """ | ||
import json | import json | ||
import logging | import logging | ||
76行目: | 79行目: | ||
from datetime import datetime | from datetime import datetime | ||
from enum import Enum | from enum import Enum | ||
from | from pathlib import Path | ||
from time import sleep | from time import sleep | ||
from traceback import TracebackException | from traceback import TracebackException | ||
106行目: | 109行目: | ||
""" | """ | ||
media_dir: Final[str] = 'tweet_media' | media_dir: Final[str] = 'tweet_media' | ||
"""Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。 | """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。""" | ||
filename: Final[str] = 'tweet.txt' | filename: Final[str] = 'tweet.txt' | ||
"""Final[str]: ツイートを保存するファイルの名前。 | """Final[str]: ツイートを保存するファイルの名前。""" | ||
log_file: Final[str] = 'twitter_archiver.log' | log_file: Final[str] = 'twitter_archiver.log' | ||
"""Final[str]: ログを保存するファイルの名前。 | """Final[str]: ログを保存するファイルの名前。""" | ||
@dataclass(init=False, eq=False, frozen=True) | @dataclass(init=False, eq=False, frozen=True) | ||
class NitterCrawler: | class NitterCrawler: | ||
"""``--search-unarchived`` オプション有りの時に利用する設定値。 | """``--search-unarchived`` オプション有りの時に利用する設定値。""" | ||
limit_n_tweets: Final[int] = 100 | limit_n_tweets: Final[int] = 100 | ||
"""Final[int]: 取得するツイート数の上限。 | """Final[int]: 取得するツイート数の上限。""" | ||
report_interval: Final[int] = 5 | report_interval: Final[int] = 5 | ||
"""Final[int]: 記録件数を報告するインターバル。 | """Final[int]: 記録件数を報告するインターバル。""" | ||
nitter_instance: Final[str] = 'https://nitter. | nitter_instance: Final[str] = 'https://nitter.poast.org/' # noqa: E501 | ||
"""Final[str]: Nitterのインスタンス。 | """Final[str]: Nitterのインスタンス。 | ||
143行目: | 140行目: | ||
@dataclass(init=False, eq=False, frozen=True) | @dataclass(init=False, eq=False, frozen=True) | ||
class ArchiveCrawler: | class ArchiveCrawler: | ||
"""``--search-unarchived`` オプション無しの時に使用する設定値。 | """``--search-unarchived`` オプション無しの時に使用する設定値。""" | ||
url_list_filename: Final[str] = 'url_list.txt' | url_list_filename: Final[str] = 'url_list.txt' | ||
"""Final[str]: URLのリストをダンプするファイル名。 | """Final[str]: URLのリストをダンプするファイル名。""" | ||
170行目: | 165行目: | ||
class AccessError(Exception): | class AccessError(Exception): | ||
"""RequestsとSeleniumで共通のアクセスエラー。 | """RequestsとSeleniumで共通のアクセスエラー。""" | ||
pass | pass | ||
class ReCaptchaRequiredError(Exception): | class ReCaptchaRequiredError(Exception): | ||
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。""" | ||
pass | pass | ||
class AbstractAccessor(metaclass=ABCMeta): | class AbstractAccessor(metaclass=ABCMeta): | ||
"""HTTPリクエストでWebサイトに接続するための基底クラス。 | """HTTPリクエストでWebサイトに接続するための基底クラス。""" | ||
WAIT_TIME: Final[int] = 1 | WAIT_TIME: Final[int] = 1 | ||
198行目: | 190行目: | ||
WAIT_RANGE: Final[int] = 5 | WAIT_RANGE: Final[int] = 5 | ||
"""Final[int]: ランダムな時間待機するときの待機時間の幅(秒)。 | """Final[int]: ランダムな時間待機するときの待機時間の幅(秒)。""" | ||
REQUEST_TIMEOUT: Final[int] = 30 | REQUEST_TIMEOUT: Final[int] = 30 | ||
"""Final[int]: HTTPリクエストのタイムアウト秒数。 | """Final[int]: HTTPリクエストのタイムアウト秒数。""" | ||
@abstractmethod | @abstractmethod | ||
240行目: | 230行目: | ||
class RequestsAccessor(AbstractAccessor): | class RequestsAccessor(AbstractAccessor): | ||
"""RequestsモジュールでWebサイトに接続するためのクラス。 | """RequestsモジュールでWebサイトに接続するためのクラス。""" | ||
HEADERS: Final[dict[str, str]] = { | HEADERS: Final[dict[str, str]] = { | ||
247行目: | 236行目: | ||
'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/120.0' # noqa: E501 | 'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/120.0' # noqa: E501 | ||
} | } | ||
"""Final[dict[str, str]]: HTTPリクエスト時のヘッダ。 | """Final[dict[str, str]]: HTTPリクエスト時のヘッダ。""" | ||
PROXIES_WITH_BROWSER: Final[dict[str, str]] = { | PROXIES_WITH_BROWSER: Final[dict[str, str]] = { | ||
254行目: | 242行目: | ||
'https': 'socks5h://127.0.0.1:9150' | 'https': 'socks5h://127.0.0.1:9150' | ||
} | } | ||
"""Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。 | """Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。""" | ||
PROXIES_WITH_COMMAND: Final[dict[str, str]] = { | PROXIES_WITH_COMMAND: Final[dict[str, str]] = { | ||
261行目: | 248行目: | ||
'https': 'socks5h://127.0.0.1:9050' | 'https': 'socks5h://127.0.0.1:9050' | ||
} | } | ||
"""Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。 | """Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。""" | ||
TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip' | TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip' | ||
"""Final[str]: Tor経由で通信しているかチェックするサイトのURL。 | """Final[str]: Tor経由で通信しているかチェックするサイトのURL。""" | ||
def __init__(self) -> None: | def __init__(self) -> None: | ||
# Torに必要なプロキシをセット | # Torに必要なプロキシをセット | ||
self._cookies: dict[str, str] = {} | self._cookies: dict[str, str] = {} | ||
375行目: | 358行目: | ||
else: | else: | ||
raise AccessError('サイトにTorのIPでアクセスできていないなりを') | raise AccessError('サイトにTorのIPでアクセスできていないなりを') | ||
except requests.exceptions.ConnectionError | except requests.exceptions.ConnectionError: | ||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを', exc_info=True) | |||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを') | |||
sys.exit(1) | sys.exit(1) | ||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
""" | """dict[str, str] | None: プロキシ設定。""" | ||
return self._proxies | return self._proxies | ||
392行目: | 370行目: | ||
class SeleniumAccessor(AbstractAccessor): | class SeleniumAccessor(AbstractAccessor): | ||
"""SeleniumでWebサイトに接続するためのクラス。 | """SeleniumでWebサイトに接続するためのクラス。 | ||
Args: | |||
enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | |||
""" | """ | ||
399行目: | 380行目: | ||
'Linux': '/usr/bin/torbrowser' | 'Linux': '/usr/bin/torbrowser' | ||
}) | }) | ||
"""Final[MappingProxyType[str, str]]: OSごとのTor Browserのパス。 | """Final[MappingProxyType[str, str]]: OSごとのTor Browserのパス。""" | ||
WEB_DRIVER_WAIT_TIME: Final[int] = 15 | WEB_DRIVER_WAIT_TIME: Final[int] = 15 | ||
"""Final[int]: 最初のTor接続時の待機時間(秒)。 | """Final[int]: 最初のTor接続時の待機時間(秒)。""" | ||
WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000 | WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000 | ||
"""Final[int]: reCAPTCHAのための待機時間(秒)。 | """Final[int]: reCAPTCHAのための待機時間(秒)。""" | ||
def __init__(self, enable_javascript: bool) -> None: | def __init__(self, enable_javascript: bool) -> None: | ||
self._options: Final[FirefoxOptions] = FirefoxOptions() | |||
self._options: Final[FirefoxOptions] = FirefoxOptions() | |||
self._options.binary_location = ( | self._options.binary_location = ( | ||
self.TOR_BROWSER_PATHS[platform.system()] | self.TOR_BROWSER_PATHS[platform.system()] | ||
436行目: | 407行目: | ||
def quit(self) -> None: | def quit(self) -> None: | ||
"""Seleniumドライバを終了する。 | """Seleniumドライバを終了する。""" | ||
if hasattr(self, '_driver'): | if hasattr(self, '_driver'): | ||
logger.debug('ブラウザ終了') | logger.debug('ブラウザ終了') | ||
443行目: | 413行目: | ||
def _refresh_browser(self) -> None: | def _refresh_browser(self) -> None: | ||
"""ブラウザを起動する。 | """ブラウザを起動する。""" | ||
try: | try: | ||
logger.debug('ブラウザ起動') | logger.debug('ブラウザ起動') | ||
468行目: | 437行目: | ||
Args: | Args: | ||
url (str): アクセスしようとしているURL。 | url (str): アクセスしようとしているURL。\ | ||
reCAPTCHAが要求されると `current_url` が変わることがあるので必要。 | reCAPTCHAが要求されると `current_url` が変わることがあるので必要。 | ||
482行目: | 451行目: | ||
print('reCAPTCHAを解いてね(笑)、それはできるよね。\a\a\a') | print('reCAPTCHAを解いてね(笑)、それはできるよね。\a\a\a') | ||
print('botバレしたら自動でブラウザが再起動するナリよ') | print('botバレしたら自動でブラウザが再起動するナリよ') | ||
print('Tips: カーソルを迷ったように動かすとか、人間らしく振る舞うのがコツナリ') | |||
WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until( | WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until( | ||
ec.presence_of_element_located( | ec.presence_of_element_located( | ||
522行目: | 492行目: | ||
self._driver.get(url) | self._driver.get(url) | ||
WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until_not( | WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until_not( | ||
ec.title_is('Redirecting')) # Nitterのリダイレクトを検知する | ec.any_of( | ||
ec.title_is('Redirecting'), | |||
ec.title_is('Verifying your browser | Nitter') | |||
) | |||
) # Nitterのリダイレクトを検知する | |||
self._check_recaptcha(url) | self._check_recaptcha(url) | ||
except WebDriverException as e: | except WebDriverException as e: | ||
# Selenium固有の例外を共通の例外に変換 | # Selenium固有の例外を共通の例外に変換 | ||
raise AccessError from e | raise AccessError from e | ||
sleep(5) | |||
return self._driver.page_source | return self._driver.page_source | ||
541行目: | 516行目: | ||
RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。 | RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。 | ||
Args: | |||
use_browser (bool): `True` ならSeleniumを利用する。`False` ならRequestsのみでアクセスする。 | |||
enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。 | |||
""" | """ | ||
LIMIT_N_REQUESTS: Final[int] = 5 | LIMIT_N_REQUESTS: Final[int] = 5 | ||
"""Final[int]: HTTPリクエスト失敗時の再試行回数。 | """Final[int]: HTTPリクエスト失敗時の再試行回数。""" | ||
WAIT_TIME_FOR_ERROR: Final[int] = 4 | WAIT_TIME_FOR_ERROR: Final[int] = 4 | ||
"""Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。 | """Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。""" | ||
def __init__(self, use_browser: bool, enable_javascript: bool) -> None: | def __init__(self, use_browser: bool, enable_javascript: bool) -> None: | ||
self._selenium_accessor: Final[SeleniumAccessor | None] = ( | self._selenium_accessor: Final[SeleniumAccessor | None] = ( | ||
SeleniumAccessor(enable_javascript) if use_browser else None | SeleniumAccessor(enable_javascript) if use_browser else None | ||
568行目: | 536行目: | ||
def __enter__(self) -> Self: | def __enter__(self) -> Self: | ||
return self | return self | ||
579行目: | 542行目: | ||
exc_value: BaseException | None, | exc_value: BaseException | None, | ||
traceback: TracebackType | None) -> None: | traceback: TracebackType | None) -> None: | ||
if self._selenium_accessor is not None: | if self._selenium_accessor is not None: | ||
self._selenium_accessor.quit() | self._selenium_accessor.quit() | ||
613行目: | 569行目: | ||
def _request_with_callable[T]( | def _request_with_callable[T]( | ||
self, | |||
url: str, | |||
request_callable: Callable[[str], T] | |||
) -> T | None: | |||
"""`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
634行目: | 591行目: | ||
for i in range(self.LIMIT_N_REQUESTS): | for i in range(self.LIMIT_N_REQUESTS): | ||
try: | try: | ||
res: | res: T = request_callable(url) | ||
except AccessError: | except AccessError: | ||
logger.warning( | logger.warning( | ||
url + 'への通信失敗ナリ ' | url + 'への通信失敗ナリ ' | ||
f'{i + 1}/{self.LIMIT_N_REQUESTS}回') | f'{i + 1}/{self.LIMIT_N_REQUESTS}回') | ||
logger.debug('エラーログ', exc_info=True) | |||
if i < self.LIMIT_N_REQUESTS: | if i < self.LIMIT_N_REQUESTS: | ||
sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | ||
710行目: | 668行目: | ||
@property | @property | ||
def last_url(self) -> str: | def last_url(self) -> str: | ||
""" | """str: 最後にアクセスしたURL。""" | ||
return self._last_url | return self._last_url | ||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
""" | """dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。""" | ||
return self._requests_accessor.proxies | return self._requests_accessor.proxies | ||
729行目: | 679行目: | ||
class TableBuilder: | class TableBuilder: | ||
"""Wikiの表を組み立てるためのクラス。 | """Wikiの表を組み立てるためのクラス。 | ||
Args: | |||
date (datetime | None, optional): 記録するツイートの最新日付。デフォルトは今日の日付。 | |||
Attributes: | |||
_tables (list[str]): ツイートのリストを日毎にまとめたもの。\ | |||
一番最後の要素が `_date` に対応し、最初の要素が最近の日付となる。 | |||
_count (int): 表に追加したツイートの件数。 | |||
_date (datetime): 現在収集中のツイートの日付。 | |||
""" | """ | ||
FILENAME: Final[str] = UserProperties.filename | FILENAME: Final[str] = UserProperties.filename | ||
"""Final[str]: ツイートを保存するファイルの名前。 | """Final[str]: ツイートを保存するファイルの名前。""" | ||
def __init__(self, date: datetime | None = None) -> None: | def __init__(self, date: datetime | None = None) -> None: | ||
self._tables: Final[list[str]] = [''] | |||
self._tables: Final[list[str]] = [''] | |||
self._count: int = 0 # 記録数 | self._count: int = 0 # 記録数 | ||
self._date: datetime = date or datetime.today() | self._date: datetime = date or datetime.today() | ||
747行目: | 699行目: | ||
@property | @property | ||
def count(self) -> int: | def count(self) -> int: | ||
""" | """int: 表に追加したツイートの件数。""" | ||
return self._count | return self._count | ||
769行目: | 717行目: | ||
def dump_file(self) -> None: | def dump_file(self) -> None: | ||
"""Wikiテーブルをファイル出力する。 | """Wikiテーブルをファイル出力する。""" | ||
self._next_day() | self._next_day() | ||
Path(self.FILENAME).write_text( | |||
'\n'.join(reversed(self._tables)), 'utf-8' | |||
) | |||
logger.info('テキストファイル手に入ったやで〜') | logger.info('テキストファイル手に入ったやで〜') | ||
868行目: | 814行目: | ||
if not hasattr(cls, '_escape_callables'): | if not hasattr(cls, '_escape_callables'): | ||
# 初回呼び出しの時だけ正規表現をコンパイルする | # 初回呼び出しの時だけ正規表現をコンパイルする | ||
head_space_pattern: Final[Pattern[str]] = re.compile( | head_space_pattern: Final[re.Pattern[str]] = re.compile( | ||
r'^ ', re.MULTILINE) | r'^ ', re.MULTILINE) | ||
head_marks_pattern: Final[Pattern[str]] = re.compile( | head_marks_pattern: Final[re.Pattern[str]] = re.compile( | ||
r'^([\*#:;])', re.MULTILINE) | r'^([\*#:;])', re.MULTILINE) | ||
bar_pattern: Final[Pattern[str]] = re.compile( | bar_pattern: Final[re.Pattern[str]] = re.compile( | ||
r'^----', re.MULTILINE) | r'^----', re.MULTILINE) | ||
890行目: | 836行目: | ||
@staticmethod | @staticmethod | ||
def archive_url( | def archive_url( | ||
url: str, | |||
archived_url: str, | |||
text: str | None = None | |||
) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
924行目: | 871行目: | ||
class FfmpegStatus(Enum): | class FfmpegStatus(Enum): | ||
"""ffmpegでの動画保存ステータス。 | """ffmpegでの動画保存ステータス。""" | ||
MP4 = 1 | MP4 = 1 | ||
"""mp4の取得に成功したときのステータス。 | """mp4の取得に成功したときのステータス。""" | ||
TS = 2 | TS = 2 | ||
"""tsの取得までは成功したが、mp4への変換に失敗したときのステータス。 | """tsの取得までは成功したが、mp4への変換に失敗したときのステータス。""" | ||
FAILED = 3 | FAILED = 3 | ||
"""m3u8からtsの取得に失敗したときのステータス。 | """m3u8からtsの取得に失敗したときのステータス。""" | ||
982行目: | 925行目: | ||
""" | """ | ||
TWITTER_URL: Final[str] = 'https:// | TWITTER_URL: Final[str] = 'https://x.com/' | ||
"""Final[str]: TwitterのURL。 | """Final[str]: TwitterのURL。 | ||
995行目: | 938行目: | ||
INVIDIOUS_INSTANCES_URL: Final[str] = \ | INVIDIOUS_INSTANCES_URL: Final[str] = \ | ||
'https://api.invidious.io/instances.json' | 'https://api.invidious.io/instances.json' | ||
"""Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。 | """Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。""" | ||
INVIDIOUS_INSTANCES_TUPLE: Final[tuple[str, ...]] = ( | INVIDIOUS_INSTANCES_TUPLE: Final[tuple[str, ...]] = ( | ||
'piped.kavin.rocks', | 'piped.kavin.rocks', | ||
'piped.video' | 'piped.video', | ||
'invidious.poast.org' | |||
) | ) | ||
"""Final[tuple[str, ...]]: よく使われるInvidiousインスタンスのリスト。 | """Final[tuple[str, ...]]: よく使われるInvidiousインスタンスのリスト。 | ||
1,009行目: | 952行目: | ||
CALLINSHOW: Final[str] = 'CallinShow' | CALLINSHOW: Final[str] = 'CallinShow' | ||
"""Final[str]: 降臨ショーのユーザーネーム。 | """Final[str]: 降臨ショーのユーザーネーム。""" | ||
LIMIT_N_TWEETS: Final[int] = UserProperties.NitterCrawler.limit_n_tweets | LIMIT_N_TWEETS: Final[int] = UserProperties.NitterCrawler.limit_n_tweets | ||
"""Final[int]: 取得するツイート数の上限。 | """Final[int]: 取得するツイート数の上限。""" | ||
REPORT_INTERVAL: Final[int] = UserProperties.NitterCrawler.report_interval | REPORT_INTERVAL: Final[int] = UserProperties.NitterCrawler.report_interval | ||
"""Final[int]: 記録件数を報告するインターバル。 | """Final[int]: 記録件数を報告するインターバル。""" | ||
TWEETS_OR_REPLIES: Final[str] = 'with_replies' | TWEETS_OR_REPLIES: Final[str] = 'with_replies' | ||
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。""" | ||
NITTER_ERROR_TITLE: Final[str] = 'Error|nitter' | NITTER_ERROR_TITLE: Final[str] = 'Error|nitter' | ||
1,043行目: | 982行目: | ||
MEDIA_DIR: Final[str] = UserProperties.media_dir | MEDIA_DIR: Final[str] = UserProperties.media_dir | ||
"""Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。 | """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。""" | ||
def __init__(self) -> None: | def __init__(self) -> None: | ||
self._check_constants() # スラッシュが抜けてないかチェック | self._check_constants() # スラッシュが抜けてないかチェック | ||
self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | ||
self._img_ext_pattern: Final[Pattern[str]] = re.compile( | self._img_ext_pattern: Final[re.Pattern[str]] = re.compile( | ||
r'%2F([^%]*\.(?:jpg|jpeg|png|gif))') | r'%2F([^%]*\.(?:jpg|jpeg|png|gif))') | ||
self._url_fragment_pattern: Final[Pattern[str]] = re.compile( | self._url_fragment_pattern: Final[re.Pattern[str]] = re.compile( | ||
r'#[^#]*$') | r'#[^#]*$') | ||
self._url_query_pattern: Final[Pattern[str]] = re.compile(r'\?.*$') | self._url_query_pattern: Final[re.Pattern[str]] = re.compile(r'\?.*$') | ||
def _set_queries(self, accessor: AccessorHandler, krsw: | def _set_queries( | ||
self, accessor: AccessorHandler, krsw: str | None | |||
) -> bool: | |||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
1,066行目: | 1,004行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): アクセスハンドラ | accessor (AccessorHandler): アクセスハンドラ | ||
krsw ( | krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になり、\ | ||
クエリと終わりにするツイートが無しになる。 | クエリと終わりにするツイートが無しになる。\ | ||
更に空文字でもない場合、この引数が終わりにするツイートになる。 | |||
Returns: | Returns: | ||
1,074行目: | 1,013行目: | ||
# ユーザー名取得 | # ユーザー名取得 | ||
if krsw: | if krsw is not None: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | ||
self._name: str = self.CALLINSHOW | self._name: str = self.CALLINSHOW | ||
1,086行目: | 1,025行目: | ||
# 検索クエリとページ取得 | # 検索クエリとページ取得 | ||
self._query_strs: list[str] = [] | self._query_strs: list[str] = [] | ||
if krsw: | if krsw is not None: | ||
logger.info('クエリは自動的になしにナリます') | logger.info('クエリは自動的になしにナリます') | ||
else: | else: | ||
1,107行目: | 1,046行目: | ||
# 終わりにするツイート取得 | # 終わりにするツイート取得 | ||
if krsw: | if krsw == '': | ||
logger.info('終わりにするツイートは自動的になしにナリます') | logger.info('終わりにするツイートは自動的になしにナリます') | ||
self._stop: str = '' | |||
elif krsw is not None: | |||
logger.info(f'終わりにするツイートは自動的に"{krsw}"にナリます') | |||
self._stop: str = krsw | |||
else: | |||
self._stop: str = self._input_stop_word() | |||
logger.info( | logger.info( | ||
1,170行目: | 1,114行目: | ||
try: | try: | ||
accessor.request_once(self.NITTER_INSTANCE) | accessor.request_once(self.NITTER_INSTANCE) | ||
except AccessError | except AccessError: | ||
logger.critical('インスタンスが死んでますを', exc_info=True) | |||
logger.critical('インスタンスが死んでますを') | |||
sys.exit(1) | sys.exit(1) | ||
logger.info('Nitter OK') | logger.info('Nitter OK') | ||
1,188行目: | 1,131行目: | ||
try: | try: | ||
accessor.request_once(self.ARCHIVE_TODAY) | accessor.request_once(self.ARCHIVE_TODAY) | ||
except AccessError | except AccessError: # エラー発生時は終了 | ||
logger.critical('インスタンスが死んでますを', exc_info=True) | |||
logger.critical('インスタンスが死んでますを') | |||
sys.exit(1) | sys.exit(1) | ||
logger.info('archive.today OK') | logger.info('archive.today OK') | ||
def _invidious_instances( | def _invidious_instances( | ||
self, accessor: AccessorHandler | |||
) -> tuple[str, ...]: | |||
"""Invidiousのインスタンスのタプルを取得する。 | """Invidiousのインスタンスのタプルを取得する。 | ||
1,239行目: | 1,182行目: | ||
+ 'になりますを') | + 'になりますを') | ||
print('> ', end='') | print('> ', end='') | ||
account_str: | account_str: str = input() | ||
# 空欄で降臨ショー | # 空欄で降臨ショー | ||
if account_str == '': | if account_str == '': | ||
return self.CALLINSHOW | return self.CALLINSHOW | ||
else: | else: | ||
res: | res: str | None = accessor.request( | ||
urljoin(self.NITTER_INSTANCE, account_str)) | urljoin(self.NITTER_INSTANCE, account_str)) | ||
if res is None: # リクエスト失敗判定 | if res is None: # リクエスト失敗判定 | ||
self._on_fail(accessor) | self._on_fail(accessor) | ||
return None | return None | ||
soup: | soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | ||
if soup.title == self.NITTER_ERROR_TITLE: | if soup.title == self.NITTER_ERROR_TITLE: | ||
print(account_str + 'は実在の人物ではありませんでした') | print(account_str + 'は実在の人物ではありませんでした') | ||
1,258行目: | 1,201行目: | ||
def _input_query(self) -> None: | def _input_query(self) -> None: | ||
"""検索クエリを標準入力から取得する。 | """検索クエリを標準入力から取得する。""" | ||
print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。') | print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。') | ||
print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行') | print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行') | ||
1,301行目: | 1,243行目: | ||
def _download_media( | def _download_media( | ||
self, | |||
media_url: str, | |||
media_name: str, | |||
accessor: AccessorHandler | |||
) -> bool: | |||
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
Args: | Args: | ||
media_url (str): 画像のURL。 | media_url (str): 画像のURL。 | ||
media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、 | media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、\ | ||
``/pic/media%2F`` に後続する。 | ``/pic/media%2F`` に後続する。 | ||
accessor (AccessorHandler): アクセスハンドラ。 | accessor (AccessorHandler): アクセスハンドラ。 | ||
1,319行目: | 1,262行目: | ||
image_bytes: Final[bytes | None] = accessor.request_image(media_url) | image_bytes: Final[bytes | None] = accessor.request_image(media_url) | ||
if image_bytes is not None: | if image_bytes is not None: | ||
Path(self.MEDIA_DIR, media_name).write_bytes(image_bytes) | |||
return True | return True | ||
else: | else: | ||
1,326行目: | 1,268行目: | ||
def _download_m3u8( | def _download_m3u8( | ||
self, | |||
media_url: str, | |||
ts_filename: str | None, | |||
mp4_filename: str, | |||
proxies: dict[str, str] | None | |||
) -> FfmpegStatus: | |||
"""ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
Args: | Args: | ||
media_url (str): 動画のm3u8/mp4ファイルのURL。 | |||
ts_filename (str): m3u8から取得したtsファイルのパス。 | ts_filename (str | None): m3u8から取得したtsファイルのパス。\ | ||
mp4_filename (str): | `None` の場合はtsファイルをダウンロードせず、直接mp4をダウンロードする。 | ||
proxies (dict[str, str] | None): | mp4_filename (str): 取得したmp4ファイルのパス。 | ||
proxies (dict[str, str] | None): ffmpegでの通信に用いるプロキシ設定。 | |||
Returns: | Returns: | ||
FfmpegStatus: ffmpegでの保存ステータス。 | FfmpegStatus: ffmpegでの保存ステータス。 | ||
""" | """ | ||
os.makedirs(self.MEDIA_DIR, exist_ok=True) | |||
if ts_filename is not None: | |||
ts_returncode: Final[int] = subprocess.run( | |||
[ | |||
'ffmpeg', '-y', | |||
'-http_proxy', 'proxies["http"]', | |||
'-i', media_url, | |||
'-c', 'copy', ts_filename | |||
] if proxies is not None else [ | |||
'ffmpeg', '-y', | |||
], | '-i', media_url, | ||
'-c', 'copy', ts_filename | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | |||
if ts_returncode == 0: | |||
ts2mp4_returncode: Final[int] = subprocess.run( | |||
[ | |||
'ffmpeg', '-y', '-i', ts_filename, | |||
'-acodec', 'copy', '-vcodec', 'copy', mp4_filename | |||
], | |||
stdout=subprocess.DEVNULL | |||
).returncode | |||
if ts2mp4_returncode == 0: | |||
return FfmpegStatus.MP4 | |||
else: | |||
return FfmpegStatus.TS | |||
else: | |||
return FfmpegStatus.FAILED | |||
else: | |||
returncode: Final[int] = subprocess.run( | |||
[ | [ | ||
'ffmpeg', '-y', '-i', | 'ffmpeg', '-y', | ||
'- | '-http_proxy', 'proxies["http"]', | ||
'-i', media_url, | |||
'-c', 'copy', mp4_filename | |||
] if proxies is not None else [ | |||
'ffmpeg', '-y', | |||
'-i', media_url, | |||
'-c', 'copy', mp4_filename | |||
], | ], | ||
stdout=subprocess.DEVNULL | stdout=subprocess.DEVNULL).returncode | ||
if returncode == 0: | |||
if | |||
return FfmpegStatus.MP4 | return FfmpegStatus.MP4 | ||
else: | else: | ||
return FfmpegStatus.FAILED | |||
def _tweet_date(self, url: str) -> datetime: | def _tweet_date(self, url: str) -> datetime: | ||
1,388行目: | 1,352行目: | ||
def _fetch_tweet_media( | def _fetch_tweet_media( | ||
self, | |||
tweet: Tag, | |||
tweet_url: str, | |||
accessor: AccessorHandler | |||
) -> str: | |||
"""ツイートの画像や動画を取得する。 | """ツイートの画像や動画を取得する。 | ||
1,411行目: | 1,376行目: | ||
for image_a in tweet_media.select('.attachment.image a'): | for image_a in tweet_media.select('.attachment.image a'): | ||
try: | try: | ||
href: | href: str | list[str] | None = image_a.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
img_matched: | img_matched: re.Match[str] | None = ( | ||
self._img_ext_pattern.search(href)) | self._img_ext_pattern.search(href)) | ||
assert img_matched is not None | assert img_matched is not None | ||
media_name: | media_name: str = img_matched.group(1) | ||
media_list.append(f'[[ファイル:{media_name}|240px]]') | media_list.append(f'[[ファイル:{media_name}|240px]]') | ||
if self._download_media( | if self._download_media( | ||
urljoin(self.TWITTER_MEDIA_URL, media_name), | urljoin(self.TWITTER_MEDIA_URL, media_name), | ||
media_name, | media_name, | ||
accessor): | accessor): | ||
logger.info( | logger.info( | ||
os.path.join(self.MEDIA_DIR, media_name) | os.path.join(self.MEDIA_DIR, media_name) | ||
+ ' をアップロードしなければない。') | + ' をアップロードしなければない。') | ||
else: | else: | ||
logger.info( | logger.info( | ||
urljoin(self.TWITTER_MEDIA_URL, media_name) | urljoin(self.TWITTER_MEDIA_URL, media_name) | ||
+ ' をアップロードしなければない。') | + ' をアップロードしなければない。') | ||
except AttributeError: | except AttributeError: | ||
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | ||
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | ||
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | ||
for i, video_container in enumerate( | for i, video_container in enumerate( | ||
tweet_media.select('.attachment.video-container')): | tweet_media.select('.attachment.video-container')): | ||
if not self._has_ffmpeg: | if not self._has_ffmpeg: | ||
logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを') | logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを') | ||
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
continue | continue | ||
# videoタグがない場合は取得できない | # videoタグがない場合は取得できない | ||
video: | video: Tag | None = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | ||
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
continue | continue | ||
data_url: | # videoタグのdata-url属性またはvideoタグ直下のsourceタグからURLが取得できる | ||
assert isinstance( | data_url: str | list[str] | None = video.get('data-url') | ||
video_matched: | source_tag: Tag | None = video.select_one('source') | ||
src_url: str | list[str] | None = \ | |||
source_tag.get('src') if source_tag is not None else None | |||
video_url: str | list[str] | None = data_url or src_url | |||
assert isinstance(video_url, str) | |||
tweet_id: str = tweet_url.split('/')[-1] | |||
if data_url is not None: | |||
# data-url属性からURLを取得した場合 | |||
mp4_filename: | video_matched: re.Match[str] | None = re.search( | ||
r'[^/]+$', video_url) | |||
assert video_matched is not None | |||
media_path: str = unquote(video_matched.group()) | |||
media_url: str = urljoin(self.NITTER_INSTANCE, media_path) | |||
ts_filename: str | None = ( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
) | |||
else: | |||
# sourceタグからURLを取得した場合 | |||
media_url: str = video_url | |||
ts_filename: str | None = None | |||
mp4_filename: str = ( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | ||
) | ) | ||
match self._download_m3u8( | match self._download_m3u8( | ||
media_url, | |||
ts_filename, | ts_filename, | ||
mp4_filename, | mp4_filename, | ||
1,527行目: | 1,504行目: | ||
assert poll_info is not None | assert poll_info is not None | ||
for poll_meter in poll_meters: | for poll_meter in poll_meters: | ||
poll_choice_value: | poll_choice_value: Tag | None = poll_meter.select_one( | ||
'.poll-choice-value') | '.poll-choice-value') | ||
assert poll_choice_value is not None | assert poll_choice_value is not None | ||
ratio: | ratio: str = poll_choice_value.text | ||
poll_choice_option: | poll_choice_option: Tag | None = poll_meter.select_one( | ||
'.poll-choice-option') | '.poll-choice-option') | ||
assert poll_choice_option is not None | assert poll_choice_option is not None | ||
1,590行目: | 1,567行目: | ||
urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a') | urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a') | ||
for url in urls_in_tweet: | for url in urls_in_tweet: | ||
href: | href: str | list[str] | None = url.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
1,610行目: | 1,587行目: | ||
and self._invidious_pattern.search(href)): | and self._invidious_pattern.search(href)): | ||
# Nitter上のYouTubeへのリンクをInvidiousのものから直す | # Nitter上のYouTubeへのリンクをInvidiousのものから直す | ||
invidious_href: | invidious_href: str | list[str] | None = ( | ||
self._invidious_pattern.sub( | self._invidious_pattern.sub( | ||
'youtube.com' if ( | 'youtube.com' if ( | ||
1,630行目: | 1,607行目: | ||
elif url.text.startswith('@'): | elif url.text.startswith('@'): | ||
url_link: str = urljoin(self.TWITTER_URL, href) | url_link: str = urljoin(self.TWITTER_URL, href) | ||
url_text: | url_text: str = url.text | ||
url.replace_with( | url.replace_with( | ||
self._archive_url( | self._archive_url( | ||
1,636行目: | 1,613行目: | ||
def _archive_url( | def _archive_url( | ||
self, | |||
url: str, | |||
accessor: AccessorHandler, | |||
text: str | None = None | |||
) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
1,677行目: | 1,655行目: | ||
取得できれば魚拓ページのURLを返す。 | 取得できれば魚拓ページのURLを返す。 | ||
魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F% | 魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。 | ||
アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F% | アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。 | ||
Args: | Args: | ||
1,704行目: | 1,682行目: | ||
else: | else: | ||
assert isinstance(content, Tag) | assert isinstance(content, Tag) | ||
content_a: Final[Tag | NavigableString | None] = content. | content_a: Final[Tag | NavigableString | None] = ( | ||
content.select_one('.TEXT-BLOCK > a')) # 最新の魚拓を取得 | |||
assert isinstance(content_a, Tag) | assert isinstance(content_a, Tag) | ||
href: Final[str | list[str] | None] = content_a.get('href') | href: Final[str | list[str] | None] = content_a.get('href') | ||
1,729行目: | 1,707行目: | ||
tweets: Final[list[Tag]] = self._get_timeline_items(soup) | tweets: Final[list[Tag]] = self._get_timeline_items(soup) | ||
for tweet in tweets: | for tweet in tweets: | ||
tweet_a: | tweet_a: Tag | None = tweet.a | ||
assert tweet_a is not None | assert tweet_a is not None | ||
if tweet_a.text == self.NEWEST: | if tweet_a.text == self.NEWEST: | ||
1,750行目: | 1,728行目: | ||
continue | continue | ||
tweet_link: | tweet_link: Tag | NavigableString | None = tweet.find( | ||
class_='tweet-link') | class_='tweet-link') | ||
assert isinstance(tweet_link, Tag) | assert isinstance(tweet_link, Tag) | ||
href: | href: str | list[str] | None = tweet_link.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
tweet_url: | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
self._url_fragment_pattern.sub('', href)) | self._url_fragment_pattern.sub('', href)) | ||
# 日付の更新処理 | # 日付の更新処理 | ||
date: | date: datetime = self._tweet_date(tweet_url) | ||
self._table_builder.next_day_if_necessary(date) | self._table_builder.next_day_if_necessary(date) | ||
tweet_callinshow_template: | tweet_callinshow_template: str = self._callinshowlink_url( | ||
tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: | tweet_content: Tag | NavigableString | None = tweet.find( | ||
class_='tweet-content media-body') | class_='tweet-content media-body') | ||
assert isinstance(tweet_content, Tag) | assert isinstance(tweet_content, Tag) | ||
self._archive_soup(tweet_content, accessor) | self._archive_soup(tweet_content, accessor) | ||
media_txt: | media_txt: str = self._fetch_tweet_media( | ||
tweet, tweet_url, accessor) | tweet, tweet_url, accessor) | ||
quote_txt: | quote_txt: str = self._get_tweet_quote(tweet, accessor) | ||
poll_txt: | poll_txt: str = self._get_tweet_poll(tweet) | ||
self._table_builder.append( | self._table_builder.append( | ||
tweet_callinshow_template, '<br>\n'.join( | tweet_callinshow_template, '<br>\n'.join( | ||
1,814行目: | 1,792行目: | ||
new_url: str = '' | new_url: str = '' | ||
for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | ||
show_more_a: | show_more_a: Tag | None = show_more.a | ||
assert show_more_a is not None | assert show_more_a is not None | ||
href: | href: str | list[str] | None = show_more_a.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
new_url = urljoin( | new_url = urljoin( | ||
1,841行目: | 1,819行目: | ||
def _signal_handler( | def _signal_handler( | ||
self, signum: int, frame: FrameType | None | |||
) -> NoReturn: | |||
"""ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。 | """ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。 | ||
1,858行目: | 1,837行目: | ||
'See also: https://nitter.cz' | 'See also: https://nitter.cz' | ||
'\033[0m') | '\033[0m') | ||
def execute(self, krsw: | def execute( | ||
self, | |||
krsw: str | None = None, | |||
use_browser: bool = True, | |||
enable_javascript: bool = True | |||
) -> None: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
Args: | Args: | ||
krsw ( | krsw (str | None, optional): `None` でない場合、名前が自動で \ | ||
:const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。\ | |||
use_browser (bool, optional): `True` ならSeleniumを利用する。 | 更に空文字でもない場合、この引数が終わりにするツイートになる。 | ||
use_browser (bool, optional): `True` ならSeleniumを利用する。\ | |||
`False` ならRequestsのみでアクセスする。 | `False` ならRequestsのみでアクセスする。 | ||
enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は | enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \ | ||
`True`。 | `True`。 | ||
1,884行目: | 1,868行目: | ||
self._invidious_instances(accessor) | self._invidious_instances(accessor) | ||
) | ) | ||
self._invidious_pattern: Pattern[str] = re.compile( | self._invidious_pattern: re.Pattern[str] = re.compile( | ||
'|'.join(invidious_url_tuple)) | '|'.join(invidious_url_tuple)) | ||
1,899行目: | 1,883行目: | ||
if not self._go_to_new_page(accessor): | if not self._go_to_new_page(accessor): | ||
break | break | ||
except BaseException | except BaseException: | ||
logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。') | logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。') | ||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
raise | raise | ||
class UrlTuple(NamedTuple): | class UrlTuple(NamedTuple): | ||
"""URLとその魚拓のURLのペア。 | """URLとその魚拓のURLのペア。""" | ||
url: str | url: str | ||
"""URL。 | """URL。""" | ||
archive_url: str | archive_url: str | ||
"""魚拓のURL。 | """魚拓のURL。""" | ||
1,929行目: | 1,910行目: | ||
* ちゃんとテストする。 | * ちゃんとテストする。 | ||
""" | """ | ||
WIKI_URL: Final[str] = 'https://krsw-wiki.in' | |||
"""Final[str]: WikiのURL。""" | |||
TEMPLATE_URL: Final[str] = ' | TEMPLATE_URL: Final[str] = WIKI_URL + '/wiki/テンプレート:降臨ショー恒心ログ' | ||
"""Final[str]: テンプレート:降臨ショー恒心ログのURL。 | """Final[str]: テンプレート:降臨ショー恒心ログのURL。""" | ||
URL_LIST_FILENAME: Final[str] = \ | URL_LIST_FILENAME: Final[str] = \ | ||
UserProperties.ArchiveCrawler.url_list_filename | UserProperties.ArchiveCrawler.url_list_filename | ||
"""Final[str]: URLのリストをダンプするファイル名。 | """Final[str]: URLのリストをダンプするファイル名。""" | ||
@override | @override | ||
def _set_queries(self, accessor: AccessorHandler, krsw: | def _set_queries( | ||
self, accessor: AccessorHandler, krsw: str | None | |||
) -> bool: | |||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
1,948行目: | 1,931行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): アクセスハンドラ | accessor (AccessorHandler): アクセスハンドラ | ||
krsw ( | krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になる。 | ||
Returns: | Returns: | ||
1,955行目: | 1,938行目: | ||
# ユーザー名取得 | # ユーザー名取得 | ||
if krsw: | if krsw is not None: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | ||
self._name: str = self.CALLINSHOW | self._name: str = self.CALLINSHOW | ||
1,976行目: | 1,959行目: | ||
+ ', 最古のURL: ' + self._oldest_url + '*' + 'で検索しまふ' | + ', 最古のURL: ' + self._oldest_url + '*' + 'で検索しまふ' | ||
) | ) | ||
self._twitter_url_pattern: Pattern[str] = re.compile( | self._twitter_url_pattern: re.Pattern[str] = re.compile( | ||
'^' + self.TWITTER_URL + self._name + r'/status/\d+') | '^' + self.TWITTER_URL + self._name + r'/status/\d+') | ||
self._archive_rt_pattern: Pattern[str] = re.compile( | self._archive_rt_pattern: re.Pattern[str] = re.compile( | ||
r'on (?:Twitter|X): "RT @\w+:.+"(?:$| / Twitter$| / X$)') | r'on (?:Twitter|X): "RT @\w+:.+"(?:$| / Twitter$| / X$)') | ||
2,033行目: | 2,016行目: | ||
urls: Final[list[str]] = list(map( | urls: Final[list[str]] = list(map( | ||
lambda x: | lambda x: self.WIKI_URL + assert_get(x, 'href'), | ||
template_soup.select('.wikitable > tbody > tr > td a'))) | template_soup.select('.wikitable > tbody > tr > td a'))) | ||
for url in urls: | for url in urls: | ||
logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ') | logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ') | ||
page: | page: str | None = accessor.request_with_requests_module(url) | ||
assert page is not None | assert page is not None | ||
soup: | soup: BeautifulSoup = BeautifulSoup(page, 'html.parser') | ||
url_as = soup.select('tr > th a') | url_as = soup.select('tr > th a') | ||
assert len(url_as) > 0 | assert len(url_as) > 0 | ||
2,061行目: | 2,043行目: | ||
for tweet in tweets: | for tweet in tweets: | ||
# リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる | # リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる | ||
urls: | urls: list[str] = list(map( | ||
lambda a: a.text, tweet.select('a')[1:])) | lambda a: a.text, tweet.select('a')[1:])) | ||
# 別のユーザへのリダイレクトがあるものは除く | # 別のユーザへのリダイレクトがあるものは除く | ||
2,068行目: | 2,050行目: | ||
continue | continue | ||
url_matched: Final[Match[str]] | None = next( | url_matched: Final[re.Match[str]] | None = next( | ||
filter( | filter( | ||
lambda x: x is not None, | lambda x: x is not None, | ||
2,078行目: | 2,060行目: | ||
logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア') | logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア') | ||
continue | continue | ||
a_first_child: | a_first_child: Tag | None = tweet.select_one('a:first-child') | ||
assert a_first_child is not None | assert a_first_child is not None | ||
archive_url: | archive_url: str | list[str] | None = a_first_child.get('href') | ||
assert isinstance(archive_url, str) | assert isinstance(archive_url, str) | ||
if url_matched[0] not in self._url_list_on_wiki: | if url_matched[0] not in self._url_list_on_wiki: | ||
2,098行目: | 2,078行目: | ||
def _fetch_next_page( | def _fetch_next_page( | ||
self, soup: Tag, accessor: AccessorHandler | |||
) -> str | None: | |||
"""archive.todayの検索結果のページをpaginateする。 | """archive.todayの検索結果のページをpaginateする。 | ||
2,128行目: | 2,109行目: | ||
while has_next: | while has_next: | ||
self._append_tweet_urls(soup) | self._append_tweet_urls(soup) | ||
next_page: | next_page: str | None = self._fetch_next_page(soup, accessor) | ||
if next_page is not None: | if next_page is not None: | ||
soup = BeautifulSoup(next_page, 'html.parser') | soup = BeautifulSoup(next_page, 'html.parser') | ||
2,136行目: | 2,116行目: | ||
def _next_url( | def _next_url( | ||
self, | |||
accessor: AccessorHandler, | |||
tweet_url_prefix: str, | |||
incremented_num: int, | |||
incremented: bool = False | |||
) -> None: | |||
"""ツイートのURLを、数字部分をインクリメントしながら探索する。 | """ツイートのURLを、数字部分をインクリメントしながら探索する。 | ||
2,180行目: | 2,161行目: | ||
pager: Final[Tag | None] = soup.select_one('#pager') | pager: Final[Tag | None] = soup.select_one('#pager') | ||
if pager is not None: # 検索結果が複数ページ | if pager is not None: # 検索結果が複数ページ | ||
page_num_matched: Final[Match[str] | None] = re.search( | page_num_matched: Final[re.Match[str] | None] = re.search( | ||
r'of (\d+) urls', pager.text) | r'of (\d+) urls', pager.text) | ||
assert page_num_matched is not None | assert page_num_matched is not None | ||
2,218行目: | 2,199行目: | ||
True) | True) | ||
def _parse_images(self, soup: Tag, | def _parse_images( | ||
self, soup: Tag, accessor: AccessorHandler | |||
) -> tuple[str, ...]: | |||
"""ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。 | """ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。 | ||
2,271行目: | 2,253行目: | ||
if len(internal_a_tags) > 0: | if len(internal_a_tags) > 0: | ||
for a_tag in internal_a_tags: | for a_tag in internal_a_tags: | ||
account_name: | account_name: str = a_tag.text | ||
if account_name.startswith('#'): | if account_name.startswith('#'): | ||
a_tag.replace_with(a_tag.text) | a_tag.replace_with(a_tag.text) | ||
else: | else: | ||
url: | url: str = urljoin(self.TWITTER_URL, account_name[1:]) | ||
a_tag.replace_with(TableBuilder.archive_url( | a_tag.replace_with(TableBuilder.archive_url( | ||
url, | url, | ||
2,284行目: | 2,265行目: | ||
# 通常のリンク | # 通常のリンク | ||
a_tags: Final[ResultSet[Tag]] = tag.select( | a_tags: Final[ResultSet[Tag]] = tag.select( | ||
'div[ | 'div[dir="auto"] > a:not(' | ||
'div[role="link"] div[ | 'div[role="link"] div[dir="auto"] > a)') | ||
for a_tag in a_tags: | for a_tag in a_tags: | ||
a_tag.replace_with( | a_tag.replace_with( | ||
2,372行目: | 2,353行目: | ||
def _get_tweet_from_archive( | def _get_tweet_from_archive( | ||
self, | |||
url_pairs: list[UrlTuple], | |||
accessor: AccessorHandler | |||
) -> None: | |||
"""魚拓からツイート本文を取得する。 | """魚拓からツイート本文を取得する。 | ||
2,384行目: | 2,366行目: | ||
for url_pair in url_pairs: | for url_pair in url_pairs: | ||
page: | page: str | None = accessor.request(url_pair.archive_url) | ||
assert page is not None | assert page is not None | ||
article: | article: Tag | None = BeautifulSoup( | ||
page, 'html.parser' | page, 'html.parser' | ||
).select_one('article[tabindex="-1"]') | ).select_one('article[tabindex="-1"]') | ||
2,394行目: | 2,376行目: | ||
logger.debug(url_pair.url + 'を整形しますを') | logger.debug(url_pair.url + 'を整形しますを') | ||
tweet_date: | tweet_date: datetime = self._tweet_date(url_pair.url) | ||
table_builder.next_day_if_necessary(tweet_date) | table_builder.next_day_if_necessary(tweet_date) | ||
tweet_callinshow_template: str = ( | tweet_callinshow_template: str = ( | ||
TableBuilder.callinshowlink_url( | |||
url_pair.url, url_pair.archive_url.replace( | |||
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD))) | |||
# 本文 | # 本文 | ||
try: | try: | ||
text_tag: Tag | None = article.select_one( | text_tag: Tag | None = article.select_one( | ||
'div[ | 'div[dir="auto"]') | ||
if text_tag is not None: | if text_tag is not None: | ||
text_tag = self._retrieve_emojis(text_tag) | text_tag = self._retrieve_emojis(text_tag) | ||
2,413行目: | 2,396行目: | ||
# YouTube等のリンク | # YouTube等のリンク | ||
card_tag: | card_tag: Tag | None = article.select_one( | ||
'div[ | 'div[aria-label="Play"]:not(div[role="link"] ' | ||
'div[aria-label="Play"])') | |||
'div[ | |||
if card_tag is not None: | if card_tag is not None: | ||
text = self._concat_texts( | text = self._concat_texts( | ||
2,423行目: | 2,405行目: | ||
# 画像に埋め込まれた外部サイトへのリンク | # 画像に埋め込まれた外部サイトへのリンク | ||
article_tag: | article_tag: Tag | None = article.select_one( | ||
' | 'a[role="link"][aria-label] img:not(div[role="link"] ' | ||
'a[role="link"][aria-label] img)') | |||
if article_tag is not None: | if article_tag is not None: | ||
text = self._concat_texts( | text = self._concat_texts( | ||
2,431行目: | 2,414行目: | ||
# 引用の有無のチェック | # 引用の有無のチェック | ||
retweet_tag: | retweet_tag: Tag | None = article.select_one( | ||
' | 'div[role="link"]') | ||
if retweet_tag is not None: | if retweet_tag is not None: | ||
account_name_tag: | account_name_tag: Tag | None = ( | ||
retweet_tag.select_one( | retweet_tag.select_one( | ||
' | 'div[tabindex="-1"] > div > span:not(:has(> *))')) # noqa: E501 | ||
assert account_name_tag is not None | assert account_name_tag is not None | ||
text = self._concat_texts( | text = self._concat_texts( | ||
2,452行目: | 2,435行目: | ||
# 投票の処理 | # 投票の処理 | ||
poll_txt: | poll_txt: str = self._get_tweet_poll(article) | ||
text = self._concat_texts(text, poll_txt) | text = self._concat_texts(text, poll_txt) | ||
# バージョンの処理 | # バージョンの処理 | ||
possible_version_text_tags: ResultSet[Tag] = ( | |||
article.select( | |||
'div > span > ' | |||
'span:not(:has(> *)):not(div[role="link"] div > ' | |||
'span > span:not(:has(> *)))')) | |||
if len(list(filter( | |||
lambda x: x.text | |||
== 'There’s a new version of this post.', | |||
possible_version_text_tags))) > 0: | |||
tweet_callinshow_template += '([[新しいバージョンがあります|後の版→]])' | tweet_callinshow_template += '([[新しいバージョンがあります|後の版→]])' | ||
except Exception as e: | except Exception as e: | ||
# エラーが起きても止めない | # エラーが起きても止めない | ||
logger.exception( | logger.exception('エラーが発生してツイートが取得できませんでしたを', exc_info=True) | ||
text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join( | text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join( | ||
TracebackException.from_exception(e).format()) | TracebackException.from_exception(e).format()) | ||
table_builder.append(tweet_callinshow_template, text) | table_builder.append(tweet_callinshow_template, text) | ||
else: | else: | ||
logger. | logger.warning(url_pair.url + 'はリツイートなので飛ばすナリ。' | ||
'URLリスト収集の時点でフィルタできなかった、これはいけない。') | |||
table_builder.dump_file() | table_builder.dump_file() | ||
@override | @override | ||
def execute(self, krsw: | def execute( | ||
self, | |||
krsw: str | None = None, | |||
use_browser: bool = True, | |||
enable_javascript: bool = True | |||
) -> None: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
Args: | Args: | ||
krsw ( | krsw (str | None, optional): `None` でない場合、名前が自動で \ | ||
:const:`~CALLINSHOW` になる。 | |||
use_browser (bool, optional): `True` ならSeleniumを利用する。 | use_browser (bool, optional): `True` ならSeleniumを利用する。\ | ||
`False` ならRequestsのみでアクセスする。 | `False` ならRequestsのみでアクセスする。 | ||
enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は | enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \ | ||
`True`。 | `True`。 | ||
""" | """ | ||
2,502行目: | 2,495行目: | ||
sys.exit(1) | sys.exit(1) | ||
# Wikiに既に掲載されているツイートのURLを取得 | # Wikiに既に掲載されているツイートのURLを取得 | ||
self._get_tweet_urls_from_wiki(accessor) | self._get_tweet_urls_from_wiki(accessor) # Wikiに接続できない時はここをコメントアウト | ||
# 未掲載のツイートのURLを取得する | # 未掲載のツイートのURLを取得する | ||
2,509行目: | 2,502行目: | ||
# URL一覧ファイルのダンプ | # URL一覧ファイルのダンプ | ||
with | with Path(self.URL_LIST_FILENAME).open('w', encoding='utf-8') as f: | ||
for url_pair in self._url_list: | for url_pair in self._url_list: | ||
f.write(url_pair.url + '\n') | f.write(url_pair.url + '\n') | ||
2,532行目: | 2,525行目: | ||
parser.add_argument( | parser.add_argument( | ||
'--krsw', | '--krsw', | ||
type=str, | |||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | nargs='?', | ||
const='', | |||
default=None, | |||
help=('指定すると、パカデブのツイートを取得上限数まで取得する。' | |||
'更に--search-unarchivedモードでこのオプションに引数を与えると、' | |||
'その文言が終わりにするツイートになる。')) | |||
parser.add_argument( | parser.add_argument( | ||
'-n', | '-n', |
回編集