「利用者:夜泣き/スクリプト」の版間の差分
>Fet-Fe (→コード: v4.3.7.post1 コメント修正) |
(→コード: v4.4.2 WikiのURLをkrsw-wiki.inに変更) |
||
(他の1人の利用者による、間の2版が非表示) | |||
11行目: | 11行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4. | ver4.4.2 2024/11/9恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。 | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。 | ||
19行目: | 19行目: | ||
Examples: | Examples: | ||
定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。 | 定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。 | ||
:: | :: | ||
24行目: | 25行目: | ||
オプションに ``--krsw`` とつけると自動モードになります。 | オプションに ``--krsw`` とつけると自動モードになります。 | ||
:: | :: | ||
33行目: | 35行目: | ||
``--no-browser`` オプションでTor Browserを使用しないモードに、 | ``--no-browser`` オプションでTor Browserを使用しないモードに、 | ||
``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ||
``--search-unarchived`` | ``--search-unarchived`` オプションでは、従来の通りNitterからツイートを収集するモードになります (廃止予定)。 | ||
Note: | Note: | ||
49行目: | 51行目: | ||
* requests, typing_extensionsも環境によっては入っていないかもしれない | * requests, typing_extensionsも環境によっては入っていないかもしれない | ||
.. code-block:: bash | |||
$ python3 -m pip install bs4 requests PySocks selenium typing_extensions | |||
* pipも入っていなければ ``$ sudo apt install pip`` | * pipも入っていなければ ``$ sudo apt install pip`` | ||
* `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます | * `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます | ||
* バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \ | * バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \ | ||
<https://krsw-wiki. | <https://krsw-wiki.in/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_ | ||
""" | """ | ||
import json | import json | ||
import logging | import logging | ||
76行目: | 79行目: | ||
from datetime import datetime | from datetime import datetime | ||
from enum import Enum | from enum import Enum | ||
from pathlib import Path | |||
from time import sleep | from time import sleep | ||
from traceback import TracebackException | from traceback import TracebackException | ||
250行目: | 254行目: | ||
def __init__(self) -> None: | def __init__(self) -> None: | ||
# Torに必要なプロキシをセット | # Torに必要なプロキシをセット | ||
self._cookies: dict[str, str] = {} | self._cookies: dict[str, str] = {} | ||
355行目: | 358行目: | ||
else: | else: | ||
raise AccessError('サイトにTorのIPでアクセスできていないなりを') | raise AccessError('サイトにTorのIPでアクセスできていないなりを') | ||
except requests.exceptions.ConnectionError | except requests.exceptions.ConnectionError: | ||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを', exc_info=True) | |||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを') | |||
sys.exit(1) | sys.exit(1) | ||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
""" | """dict[str, str] | None: プロキシ設定。""" | ||
return self._proxies | return self._proxies | ||
class SeleniumAccessor(AbstractAccessor): | class SeleniumAccessor(AbstractAccessor): | ||
"""SeleniumでWebサイトに接続するためのクラス。""" | """SeleniumでWebサイトに接続するためのクラス。 | ||
Args: | |||
enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | |||
""" | |||
TOR_BROWSER_PATHS: Final[MappingProxyType[str, str]] = MappingProxyType({ | TOR_BROWSER_PATHS: Final[MappingProxyType[str, str]] = MappingProxyType({ | ||
387行目: | 389行目: | ||
def __init__(self, enable_javascript: bool) -> None: | def __init__(self, enable_javascript: bool) -> None: | ||
self._options: Final[FirefoxOptions] = FirefoxOptions() | self._options: Final[FirefoxOptions] = FirefoxOptions() | ||
self._options.binary_location = ( | self._options.binary_location = ( | ||
521行目: | 516行目: | ||
RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。 | RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。 | ||
Args: | |||
use_browser (bool): `True` ならSeleniumを利用する。`False` ならRequestsのみでアクセスする。 | |||
enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。 | |||
""" | """ | ||
530行目: | 529行目: | ||
def __init__(self, use_browser: bool, enable_javascript: bool) -> None: | def __init__(self, use_browser: bool, enable_javascript: bool) -> None: | ||
self._selenium_accessor: Final[SeleniumAccessor | None] = ( | self._selenium_accessor: Final[SeleniumAccessor | None] = ( | ||
SeleniumAccessor(enable_javascript) if use_browser else None | SeleniumAccessor(enable_javascript) if use_browser else None | ||
546行目: | 536行目: | ||
def __enter__(self) -> Self: | def __enter__(self) -> Self: | ||
return self | return self | ||
557行目: | 542行目: | ||
exc_value: BaseException | None, | exc_value: BaseException | None, | ||
traceback: TracebackType | None) -> None: | traceback: TracebackType | None) -> None: | ||
if self._selenium_accessor is not None: | if self._selenium_accessor is not None: | ||
self._selenium_accessor.quit() | self._selenium_accessor.quit() | ||
591行目: | 569行目: | ||
def _request_with_callable[T]( | def _request_with_callable[T]( | ||
self, | |||
url: str, | |||
request_callable: Callable[[str], T] | |||
) -> T | None: | |||
"""`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
612行目: | 591行目: | ||
for i in range(self.LIMIT_N_REQUESTS): | for i in range(self.LIMIT_N_REQUESTS): | ||
try: | try: | ||
res: | res: T = request_callable(url) | ||
except AccessError: | except AccessError: | ||
logger.warning( | logger.warning( | ||
url + 'への通信失敗ナリ ' | url + 'への通信失敗ナリ ' | ||
f'{i + 1}/{self.LIMIT_N_REQUESTS}回') | f'{i + 1}/{self.LIMIT_N_REQUESTS}回') | ||
logger.debug('エラーログ', exc_info=True) | |||
if i < self.LIMIT_N_REQUESTS: | if i < self.LIMIT_N_REQUESTS: | ||
sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | ||
688行目: | 668行目: | ||
@property | @property | ||
def last_url(self) -> str: | def last_url(self) -> str: | ||
""" | """str: 最後にアクセスしたURL。""" | ||
return self._last_url | return self._last_url | ||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
""" | """dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。""" | ||
return self._requests_accessor.proxies | return self._requests_accessor.proxies | ||
class TableBuilder: | class TableBuilder: | ||
"""Wikiの表を組み立てるためのクラス。 | """Wikiの表を組み立てるためのクラス。 | ||
Args: | |||
date (datetime | None, optional): 記録するツイートの最新日付。デフォルトは今日の日付。 | |||
Attributes: | |||
_tables (list[str]): ツイートのリストを日毎にまとめたもの。\ | |||
一番最後の要素が `_date` に対応し、最初の要素が最近の日付となる。 | |||
_count (int): 表に追加したツイートの件数。 | |||
_date (datetime): 現在収集中のツイートの日付。 | |||
""" | |||
FILENAME: Final[str] = UserProperties.filename | FILENAME: Final[str] = UserProperties.filename | ||
"""Final[str]: ツイートを保存するファイルの名前。""" | """Final[str]: ツイートを保存するファイルの名前。""" | ||
def __init__(self, date: datetime | None = None) -> None: | def __init__(self, date: datetime | None = None) -> None: | ||
self._tables: Final[list[str]] = [''] | self._tables: Final[list[str]] = [''] | ||
self._count: int = 0 # 記録数 | self._count: int = 0 # 記録数 | ||
723行目: | 699行目: | ||
@property | @property | ||
def count(self) -> int: | def count(self) -> int: | ||
""" | """int: 表に追加したツイートの件数。""" | ||
return self._count | return self._count | ||
747行目: | 719行目: | ||
"""Wikiテーブルをファイル出力する。""" | """Wikiテーブルをファイル出力する。""" | ||
self._next_day() | self._next_day() | ||
Path(self.FILENAME).write_text( | |||
'\n'.join(reversed(self._tables)), 'utf-8' | |||
) | |||
logger.info('テキストファイル手に入ったやで〜') | logger.info('テキストファイル手に入ったやで〜') | ||
865行目: | 836行目: | ||
@staticmethod | @staticmethod | ||
def archive_url( | def archive_url( | ||
url: str, | |||
archived_url: str, | |||
text: str | None = None | |||
) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
1,013行目: | 985行目: | ||
def __init__(self) -> None: | def __init__(self) -> None: | ||
self._check_constants() # スラッシュが抜けてないかチェック | self._check_constants() # スラッシュが抜けてないかチェック | ||
self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | ||
1,023行目: | 994行目: | ||
self._url_query_pattern: Final[re.Pattern[str]] = re.compile(r'\?.*$') | self._url_query_pattern: Final[re.Pattern[str]] = re.compile(r'\?.*$') | ||
def _set_queries(self, accessor: AccessorHandler, krsw: | def _set_queries( | ||
self, accessor: AccessorHandler, krsw: str | None | |||
) -> bool: | |||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
1,031行目: | 1,004行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): アクセスハンドラ | accessor (AccessorHandler): アクセスハンドラ | ||
krsw ( | krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になり、\ | ||
クエリと終わりにするツイートが無しになる。 | クエリと終わりにするツイートが無しになる。\ | ||
更に空文字でもない場合、この引数が終わりにするツイートになる。 | |||
Returns: | Returns: | ||
1,039行目: | 1,013行目: | ||
# ユーザー名取得 | # ユーザー名取得 | ||
if krsw: | if krsw is not None: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | ||
self._name: str = self.CALLINSHOW | self._name: str = self.CALLINSHOW | ||
1,051行目: | 1,025行目: | ||
# 検索クエリとページ取得 | # 検索クエリとページ取得 | ||
self._query_strs: list[str] = [] | self._query_strs: list[str] = [] | ||
if krsw: | if krsw is not None: | ||
logger.info('クエリは自動的になしにナリます') | logger.info('クエリは自動的になしにナリます') | ||
else: | else: | ||
1,072行目: | 1,046行目: | ||
# 終わりにするツイート取得 | # 終わりにするツイート取得 | ||
if krsw: | if krsw == '': | ||
logger.info('終わりにするツイートは自動的になしにナリます') | logger.info('終わりにするツイートは自動的になしにナリます') | ||
self._stop: str = '' | |||
elif krsw is not None: | |||
logger.info(f'終わりにするツイートは自動的に"{krsw}"にナリます') | |||
self._stop: str = krsw | |||
else: | |||
self._stop: str = self._input_stop_word() | |||
logger.info( | logger.info( | ||
1,135行目: | 1,114行目: | ||
try: | try: | ||
accessor.request_once(self.NITTER_INSTANCE) | accessor.request_once(self.NITTER_INSTANCE) | ||
except AccessError | except AccessError: | ||
logger.critical('インスタンスが死んでますを', exc_info=True) | |||
logger.critical('インスタンスが死んでますを') | |||
sys.exit(1) | sys.exit(1) | ||
logger.info('Nitter OK') | logger.info('Nitter OK') | ||
1,153行目: | 1,131行目: | ||
try: | try: | ||
accessor.request_once(self.ARCHIVE_TODAY) | accessor.request_once(self.ARCHIVE_TODAY) | ||
except AccessError | except AccessError: # エラー発生時は終了 | ||
logger.critical('インスタンスが死んでますを', exc_info=True) | |||
logger.critical('インスタンスが死んでますを') | |||
sys.exit(1) | sys.exit(1) | ||
logger.info('archive.today OK') | logger.info('archive.today OK') | ||
def _invidious_instances( | def _invidious_instances( | ||
self, accessor: AccessorHandler | |||
) -> tuple[str, ...]: | |||
"""Invidiousのインスタンスのタプルを取得する。 | """Invidiousのインスタンスのタプルを取得する。 | ||
1,204行目: | 1,182行目: | ||
+ 'になりますを') | + 'になりますを') | ||
print('> ', end='') | print('> ', end='') | ||
account_str: | account_str: str = input() | ||
# 空欄で降臨ショー | # 空欄で降臨ショー | ||
if account_str == '': | if account_str == '': | ||
return self.CALLINSHOW | return self.CALLINSHOW | ||
else: | else: | ||
res: | res: str | None = accessor.request( | ||
urljoin(self.NITTER_INSTANCE, account_str)) | urljoin(self.NITTER_INSTANCE, account_str)) | ||
if res is None: # リクエスト失敗判定 | if res is None: # リクエスト失敗判定 | ||
self._on_fail(accessor) | self._on_fail(accessor) | ||
return None | return None | ||
soup: | soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | ||
if soup.title == self.NITTER_ERROR_TITLE: | if soup.title == self.NITTER_ERROR_TITLE: | ||
print(account_str + 'は実在の人物ではありませんでした') | print(account_str + 'は実在の人物ではありませんでした') | ||
1,265行目: | 1,243行目: | ||
def _download_media( | def _download_media( | ||
self, | |||
media_url: str, | |||
media_name: str, | |||
accessor: AccessorHandler | |||
) -> bool: | |||
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
1,283行目: | 1,262行目: | ||
image_bytes: Final[bytes | None] = accessor.request_image(media_url) | image_bytes: Final[bytes | None] = accessor.request_image(media_url) | ||
if image_bytes is not None: | if image_bytes is not None: | ||
Path(self.MEDIA_DIR, media_name).write_bytes(image_bytes) | |||
return True | return True | ||
else: | else: | ||
1,290行目: | 1,268行目: | ||
def _download_m3u8( | def _download_m3u8( | ||
self, | |||
media_url: str, | |||
ts_filename: str | None, | |||
mp4_filename: str, | |||
proxies: dict[str, str] | None | |||
) -> FfmpegStatus: | |||
"""ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
Args: | Args: | ||
media_url (str): 動画のm3u8/mp4ファイルのURL。 | |||
ts_filename (str): m3u8から取得したtsファイルのパス。 | ts_filename (str | None): m3u8から取得したtsファイルのパス。\ | ||
mp4_filename (str): | `None` の場合はtsファイルをダウンロードせず、直接mp4をダウンロードする。 | ||
proxies (dict[str, str] | None): | mp4_filename (str): 取得したmp4ファイルのパス。 | ||
proxies (dict[str, str] | None): ffmpegでの通信に用いるプロキシ設定。 | |||
Returns: | Returns: | ||
FfmpegStatus: ffmpegでの保存ステータス。 | FfmpegStatus: ffmpegでの保存ステータス。 | ||
""" | """ | ||
os.makedirs(self.MEDIA_DIR, exist_ok=True) | |||
if ts_filename is not None: | |||
ts_returncode: Final[int] = subprocess.run( | |||
[ | |||
'ffmpeg', '-y', | |||
'-http_proxy', 'proxies["http"]', | |||
'-i', media_url, | |||
'-c', 'copy', ts_filename | |||
] if proxies is not None else [ | |||
'ffmpeg', '-y', | |||
'-i', media_url, | |||
'-c', 'copy', ts_filename | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | |||
if ts_returncode == 0: | |||
ts2mp4_returncode: Final[int] = subprocess.run( | |||
[ | |||
'ffmpeg', '-y', '-i', ts_filename, | |||
'-acodec', 'copy', '-vcodec', 'copy', mp4_filename | |||
], | |||
stdout=subprocess.DEVNULL | |||
).returncode | |||
if ts2mp4_returncode == 0: | |||
return FfmpegStatus.MP4 | |||
else: | |||
return FfmpegStatus.TS | |||
else: | |||
return FfmpegStatus.FAILED | |||
else: | |||
returncode: Final[int] = subprocess.run( | |||
[ | [ | ||
'ffmpeg', '-y', '-i', | 'ffmpeg', '-y', | ||
'- | '-http_proxy', 'proxies["http"]', | ||
'-i', media_url, | |||
'-c', 'copy', mp4_filename | |||
] if proxies is not None else [ | |||
'ffmpeg', '-y', | |||
'-i', media_url, | |||
'-c', 'copy', mp4_filename | |||
], | ], | ||
stdout=subprocess.DEVNULL | stdout=subprocess.DEVNULL).returncode | ||
if returncode == 0: | |||
if | |||
return FfmpegStatus.MP4 | return FfmpegStatus.MP4 | ||
else: | else: | ||
return FfmpegStatus.FAILED | |||
def _tweet_date(self, url: str) -> datetime: | def _tweet_date(self, url: str) -> datetime: | ||
1,352行目: | 1,352行目: | ||
def _fetch_tweet_media( | def _fetch_tweet_media( | ||
self, | |||
tweet: Tag, | |||
tweet_url: str, | |||
accessor: AccessorHandler | |||
) -> str: | |||
"""ツイートの画像や動画を取得する。 | """ツイートの画像や動画を取得する。 | ||
1,375行目: | 1,376行目: | ||
for image_a in tweet_media.select('.attachment.image a'): | for image_a in tweet_media.select('.attachment.image a'): | ||
try: | try: | ||
href: | href: str | list[str] | None = image_a.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
img_matched: | img_matched: re.Match[str] | None = ( | ||
self._img_ext_pattern.search(href)) | self._img_ext_pattern.search(href)) | ||
assert img_matched is not None | assert img_matched is not None | ||
media_name: | media_name: str = img_matched.group(1) | ||
media_list.append(f'[[ファイル:{media_name}|240px]]') | media_list.append(f'[[ファイル:{media_name}|240px]]') | ||
if self._download_media( | if self._download_media( | ||
1,406行目: | 1,407行目: | ||
# videoタグがない場合は取得できない | # videoタグがない場合は取得できない | ||
video: | video: Tag | None = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | ||
1,412行目: | 1,413行目: | ||
continue | continue | ||
data_url: | # videoタグのdata-url属性またはvideoタグ直下のsourceタグからURLが取得できる | ||
assert isinstance( | data_url: str | list[str] | None = video.get('data-url') | ||
video_matched: | source_tag: Tag | None = video.select_one('source') | ||
src_url: str | list[str] | None = \ | |||
source_tag.get('src') if source_tag is not None else None | |||
video_url: str | list[str] | None = data_url or src_url | |||
assert isinstance(video_url, str) | |||
tweet_id: str = tweet_url.split('/')[-1] | |||
if data_url is not None: | |||
# data-url属性からURLを取得した場合 | |||
mp4_filename: | video_matched: re.Match[str] | None = re.search( | ||
r'[^/]+$', video_url) | |||
assert video_matched is not None | |||
media_path: str = unquote(video_matched.group()) | |||
media_url: str = urljoin(self.NITTER_INSTANCE, media_path) | |||
ts_filename: str | None = ( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
) | |||
else: | |||
# sourceタグからURLを取得した場合 | |||
media_url: str = video_url | |||
ts_filename: str | None = None | |||
mp4_filename: str = ( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | ||
) | ) | ||
match self._download_m3u8( | match self._download_m3u8( | ||
media_url, | |||
ts_filename, | ts_filename, | ||
mp4_filename, | mp4_filename, | ||
1,491行目: | 1,504行目: | ||
assert poll_info is not None | assert poll_info is not None | ||
for poll_meter in poll_meters: | for poll_meter in poll_meters: | ||
poll_choice_value: | poll_choice_value: Tag | None = poll_meter.select_one( | ||
'.poll-choice-value') | '.poll-choice-value') | ||
assert poll_choice_value is not None | assert poll_choice_value is not None | ||
ratio: | ratio: str = poll_choice_value.text | ||
poll_choice_option: | poll_choice_option: Tag | None = poll_meter.select_one( | ||
'.poll-choice-option') | '.poll-choice-option') | ||
assert poll_choice_option is not None | assert poll_choice_option is not None | ||
1,554行目: | 1,567行目: | ||
urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a') | urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a') | ||
for url in urls_in_tweet: | for url in urls_in_tweet: | ||
href: | href: str | list[str] | None = url.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
1,574行目: | 1,587行目: | ||
and self._invidious_pattern.search(href)): | and self._invidious_pattern.search(href)): | ||
# Nitter上のYouTubeへのリンクをInvidiousのものから直す | # Nitter上のYouTubeへのリンクをInvidiousのものから直す | ||
invidious_href: | invidious_href: str | list[str] | None = ( | ||
self._invidious_pattern.sub( | self._invidious_pattern.sub( | ||
'youtube.com' if ( | 'youtube.com' if ( | ||
1,594行目: | 1,607行目: | ||
elif url.text.startswith('@'): | elif url.text.startswith('@'): | ||
url_link: str = urljoin(self.TWITTER_URL, href) | url_link: str = urljoin(self.TWITTER_URL, href) | ||
url_text: | url_text: str = url.text | ||
url.replace_with( | url.replace_with( | ||
self._archive_url( | self._archive_url( | ||
1,600行目: | 1,613行目: | ||
def _archive_url( | def _archive_url( | ||
self, | |||
url: str, | |||
accessor: AccessorHandler, | |||
text: str | None = None | |||
) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
1,693行目: | 1,707行目: | ||
tweets: Final[list[Tag]] = self._get_timeline_items(soup) | tweets: Final[list[Tag]] = self._get_timeline_items(soup) | ||
for tweet in tweets: | for tweet in tweets: | ||
tweet_a: | tweet_a: Tag | None = tweet.a | ||
assert tweet_a is not None | assert tweet_a is not None | ||
if tweet_a.text == self.NEWEST: | if tweet_a.text == self.NEWEST: | ||
1,714行目: | 1,728行目: | ||
continue | continue | ||
tweet_link: | tweet_link: Tag | NavigableString | None = tweet.find( | ||
class_='tweet-link') | class_='tweet-link') | ||
assert isinstance(tweet_link, Tag) | assert isinstance(tweet_link, Tag) | ||
href: | href: str | list[str] | None = tweet_link.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
tweet_url: | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
self._url_fragment_pattern.sub('', href)) | self._url_fragment_pattern.sub('', href)) | ||
# 日付の更新処理 | # 日付の更新処理 | ||
date: | date: datetime = self._tweet_date(tweet_url) | ||
self._table_builder.next_day_if_necessary(date) | self._table_builder.next_day_if_necessary(date) | ||
tweet_callinshow_template: | tweet_callinshow_template: str = self._callinshowlink_url( | ||
tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: | tweet_content: Tag | NavigableString | None = tweet.find( | ||
class_='tweet-content media-body') | class_='tweet-content media-body') | ||
assert isinstance(tweet_content, Tag) | assert isinstance(tweet_content, Tag) | ||
self._archive_soup(tweet_content, accessor) | self._archive_soup(tweet_content, accessor) | ||
media_txt: | media_txt: str = self._fetch_tweet_media( | ||
tweet, tweet_url, accessor) | tweet, tweet_url, accessor) | ||
quote_txt: | quote_txt: str = self._get_tweet_quote(tweet, accessor) | ||
poll_txt: | poll_txt: str = self._get_tweet_poll(tweet) | ||
self._table_builder.append( | self._table_builder.append( | ||
tweet_callinshow_template, '<br>\n'.join( | tweet_callinshow_template, '<br>\n'.join( | ||
1,778行目: | 1,792行目: | ||
new_url: str = '' | new_url: str = '' | ||
for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | ||
show_more_a: | show_more_a: Tag | None = show_more.a | ||
assert show_more_a is not None | assert show_more_a is not None | ||
href: | href: str | list[str] | None = show_more_a.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
new_url = urljoin( | new_url = urljoin( | ||
1,805行目: | 1,819行目: | ||
def _signal_handler( | def _signal_handler( | ||
self, signum: int, frame: FrameType | None | |||
) -> NoReturn: | |||
"""ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。 | """ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。 | ||
1,822行目: | 1,837行目: | ||
'See also: https://nitter.cz' | 'See also: https://nitter.cz' | ||
'\033[0m') | '\033[0m') | ||
def execute(self, krsw: | def execute( | ||
self, | |||
krsw: str | None = None, | |||
use_browser: bool = True, | |||
enable_javascript: bool = True | |||
) -> None: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
Args: | Args: | ||
krsw ( | krsw (str | None, optional): `None` でない場合、名前が自動で \ | ||
:const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。\ | |||
更に空文字でもない場合、この引数が終わりにするツイートになる。 | |||
use_browser (bool, optional): `True` ならSeleniumを利用する。\ | use_browser (bool, optional): `True` ならSeleniumを利用する。\ | ||
`False` ならRequestsのみでアクセスする。 | `False` ならRequestsのみでアクセスする。 | ||
1,863行目: | 1,883行目: | ||
if not self._go_to_new_page(accessor): | if not self._go_to_new_page(accessor): | ||
break | break | ||
except BaseException | except BaseException: | ||
logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。') | logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。') | ||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
raise | raise | ||
1,890行目: | 1,910行目: | ||
* ちゃんとテストする。 | * ちゃんとテストする。 | ||
""" | """ | ||
WIKI_URL: Final[str] = 'https://krsw-wiki.in' | |||
"""Final[str]: WikiのURL。""" | |||
TEMPLATE_URL: Final[str] = ' | TEMPLATE_URL: Final[str] = WIKI_URL + '/wiki/テンプレート:降臨ショー恒心ログ' | ||
"""Final[str]: テンプレート:降臨ショー恒心ログのURL。""" | """Final[str]: テンプレート:降臨ショー恒心ログのURL。""" | ||
1,899行目: | 1,921行目: | ||
@override | @override | ||
def _set_queries(self, accessor: AccessorHandler, krsw: | def _set_queries( | ||
self, accessor: AccessorHandler, krsw: str | None | |||
) -> bool: | |||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
1,907行目: | 1,931行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): アクセスハンドラ | accessor (AccessorHandler): アクセスハンドラ | ||
krsw ( | krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になる。 | ||
Returns: | Returns: | ||
1,914行目: | 1,938行目: | ||
# ユーザー名取得 | # ユーザー名取得 | ||
if krsw: | if krsw is not None: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | ||
self._name: str = self.CALLINSHOW | self._name: str = self.CALLINSHOW | ||
1,992行目: | 2,016行目: | ||
urls: Final[list[str]] = list(map( | urls: Final[list[str]] = list(map( | ||
lambda x: | lambda x: self.WIKI_URL + assert_get(x, 'href'), | ||
template_soup.select('.wikitable > tbody > tr > td a'))) | template_soup.select('.wikitable > tbody > tr > td a'))) | ||
for url in urls: | for url in urls: | ||
logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ') | logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ') | ||
page: | page: str | None = accessor.request_with_requests_module(url) | ||
assert page is not None | assert page is not None | ||
soup: | soup: BeautifulSoup = BeautifulSoup(page, 'html.parser') | ||
url_as = soup.select('tr > th a') | url_as = soup.select('tr > th a') | ||
assert len(url_as) > 0 | assert len(url_as) > 0 | ||
2,020行目: | 2,043行目: | ||
for tweet in tweets: | for tweet in tweets: | ||
# リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる | # リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる | ||
urls: | urls: list[str] = list(map( | ||
lambda a: a.text, tweet.select('a')[1:])) | lambda a: a.text, tweet.select('a')[1:])) | ||
# 別のユーザへのリダイレクトがあるものは除く | # 別のユーザへのリダイレクトがあるものは除く | ||
2,037行目: | 2,060行目: | ||
logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア') | logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア') | ||
continue | continue | ||
a_first_child: | a_first_child: Tag | None = tweet.select_one('a:first-child') | ||
assert a_first_child is not None | assert a_first_child is not None | ||
archive_url: | archive_url: str | list[str] | None = a_first_child.get('href') | ||
assert isinstance(archive_url, str) | assert isinstance(archive_url, str) | ||
if url_matched[0] not in self._url_list_on_wiki: | if url_matched[0] not in self._url_list_on_wiki: | ||
2,057行目: | 2,078行目: | ||
def _fetch_next_page( | def _fetch_next_page( | ||
self, soup: Tag, accessor: AccessorHandler | |||
) -> str | None: | |||
"""archive.todayの検索結果のページをpaginateする。 | """archive.todayの検索結果のページをpaginateする。 | ||
2,087行目: | 2,109行目: | ||
while has_next: | while has_next: | ||
self._append_tweet_urls(soup) | self._append_tweet_urls(soup) | ||
next_page: | next_page: str | None = self._fetch_next_page(soup, accessor) | ||
if next_page is not None: | if next_page is not None: | ||
soup = BeautifulSoup(next_page, 'html.parser') | soup = BeautifulSoup(next_page, 'html.parser') | ||
2,095行目: | 2,116行目: | ||
def _next_url( | def _next_url( | ||
self, | |||
accessor: AccessorHandler, | |||
tweet_url_prefix: str, | |||
incremented_num: int, | |||
incremented: bool = False | |||
) -> None: | |||
"""ツイートのURLを、数字部分をインクリメントしながら探索する。 | """ツイートのURLを、数字部分をインクリメントしながら探索する。 | ||
2,177行目: | 2,199行目: | ||
True) | True) | ||
def _parse_images(self, soup: Tag, | def _parse_images( | ||
self, soup: Tag, accessor: AccessorHandler | |||
) -> tuple[str, ...]: | |||
"""ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。 | """ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。 | ||
2,230行目: | 2,253行目: | ||
if len(internal_a_tags) > 0: | if len(internal_a_tags) > 0: | ||
for a_tag in internal_a_tags: | for a_tag in internal_a_tags: | ||
account_name: | account_name: str = a_tag.text | ||
if account_name.startswith('#'): | if account_name.startswith('#'): | ||
a_tag.replace_with(a_tag.text) | a_tag.replace_with(a_tag.text) | ||
else: | else: | ||
url: | url: str = urljoin(self.TWITTER_URL, account_name[1:]) | ||
a_tag.replace_with(TableBuilder.archive_url( | a_tag.replace_with(TableBuilder.archive_url( | ||
url, | url, | ||
2,331行目: | 2,353行目: | ||
def _get_tweet_from_archive( | def _get_tweet_from_archive( | ||
self, | |||
url_pairs: list[UrlTuple], | |||
accessor: AccessorHandler | |||
) -> None: | |||
"""魚拓からツイート本文を取得する。 | """魚拓からツイート本文を取得する。 | ||
2,343行目: | 2,366行目: | ||
for url_pair in url_pairs: | for url_pair in url_pairs: | ||
page: | page: str | None = accessor.request(url_pair.archive_url) | ||
assert page is not None | assert page is not None | ||
article: | article: Tag | None = BeautifulSoup( | ||
page, 'html.parser' | page, 'html.parser' | ||
).select_one('article[tabindex="-1"]') | ).select_one('article[tabindex="-1"]') | ||
2,353行目: | 2,376行目: | ||
logger.debug(url_pair.url + 'を整形しますを') | logger.debug(url_pair.url + 'を整形しますを') | ||
tweet_date: | tweet_date: datetime = self._tweet_date(url_pair.url) | ||
table_builder.next_day_if_necessary(tweet_date) | table_builder.next_day_if_necessary(tweet_date) | ||
2,373行目: | 2,396行目: | ||
# YouTube等のリンク | # YouTube等のリンク | ||
card_tag: | card_tag: Tag | None = article.select_one( | ||
'div[aria-label="Play"]:not(div[role="link"] ' | 'div[aria-label="Play"]:not(div[role="link"] ' | ||
'div[aria-label="Play"])') | 'div[aria-label="Play"])') | ||
2,382行目: | 2,405行目: | ||
# 画像に埋め込まれた外部サイトへのリンク | # 画像に埋め込まれた外部サイトへのリンク | ||
article_tag: | article_tag: Tag | None = article.select_one( | ||
'a[role="link"][aria-label] img:not(div[role="link"] ' | 'a[role="link"][aria-label] img:not(div[role="link"] ' | ||
'a[role="link"][aria-label] img)') | 'a[role="link"][aria-label] img)') | ||
2,391行目: | 2,414行目: | ||
# 引用の有無のチェック | # 引用の有無のチェック | ||
retweet_tag: | retweet_tag: Tag | None = article.select_one( | ||
'div[role="link"]') | 'div[role="link"]') | ||
if retweet_tag is not None: | if retweet_tag is not None: | ||
account_name_tag: | account_name_tag: Tag | None = ( | ||
retweet_tag.select_one( | retweet_tag.select_one( | ||
'div[tabindex="-1"] > div > span:not(:has(> *))')) # noqa: E501 | 'div[tabindex="-1"] > div > span:not(:has(> *))')) # noqa: E501 | ||
2,412行目: | 2,435行目: | ||
# 投票の処理 | # 投票の処理 | ||
poll_txt: | poll_txt: str = self._get_tweet_poll(article) | ||
text = self._concat_texts(text, poll_txt) | text = self._concat_texts(text, poll_txt) | ||
# バージョンの処理 | # バージョンの処理 | ||
possible_version_text_tags: | possible_version_text_tags: ResultSet[Tag] = ( | ||
article.select( | article.select( | ||
'div > span > ' | 'div > span > ' | ||
2,429行目: | 2,452行目: | ||
except Exception as e: | except Exception as e: | ||
# エラーが起きても止めない | # エラーが起きても止めない | ||
logger.exception( | logger.exception('エラーが発生してツイートが取得できませんでしたを', exc_info=True) | ||
text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join( | text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join( | ||
TracebackException.from_exception(e).format()) | TracebackException.from_exception(e).format()) | ||
table_builder.append(tweet_callinshow_template, text) | table_builder.append(tweet_callinshow_template, text) | ||
else: | else: | ||
logger. | logger.warning(url_pair.url + 'はリツイートなので飛ばすナリ。' | ||
'URLリスト収集の時点でフィルタできなかった、これはいけない。') | |||
table_builder.dump_file() | table_builder.dump_file() | ||
@override | @override | ||
def execute(self, krsw: | def execute( | ||
self, | |||
krsw: str | None = None, | |||
use_browser: bool = True, | |||
enable_javascript: bool = True | |||
) -> None: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
Args: | Args: | ||
krsw ( | krsw (str | None, optional): `None` でない場合、名前が自動で \ | ||
:const:`~CALLINSHOW` になる。 | |||
use_browser (bool, optional): `True` ならSeleniumを利用する。\ | use_browser (bool, optional): `True` ならSeleniumを利用する。\ | ||
`False` ならRequestsのみでアクセスする。 | `False` ならRequestsのみでアクセスする。 | ||
2,468行目: | 2,495行目: | ||
sys.exit(1) | sys.exit(1) | ||
# Wikiに既に掲載されているツイートのURLを取得 | # Wikiに既に掲載されているツイートのURLを取得 | ||
self._get_tweet_urls_from_wiki(accessor) | self._get_tweet_urls_from_wiki(accessor) # Wikiに接続できない時はここをコメントアウト | ||
# 未掲載のツイートのURLを取得する | # 未掲載のツイートのURLを取得する | ||
2,475行目: | 2,502行目: | ||
# URL一覧ファイルのダンプ | # URL一覧ファイルのダンプ | ||
with | with Path(self.URL_LIST_FILENAME).open('w', encoding='utf-8') as f: | ||
for url_pair in self._url_list: | for url_pair in self._url_list: | ||
f.write(url_pair.url + '\n') | f.write(url_pair.url + '\n') | ||
2,498行目: | 2,525行目: | ||
parser.add_argument( | parser.add_argument( | ||
'--krsw', | '--krsw', | ||
type=str, | |||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | nargs='?', | ||
const='', | |||
default=None, | |||
help=('指定すると、パカデブのツイートを取得上限数まで取得する。' | |||
'更に--search-unarchivedモードでこのオプションに引数を与えると、' | |||
'その文言が終わりにするツイートになる。')) | |||
parser.add_argument( | parser.add_argument( | ||
'-n', | '-n', |
2024年11月9日 (土) 16:59時点における最新版
とりあえず取り急ぎ。バグ報告は利用者・トーク:夜泣き
コード
#!/usr/bin/env python3
# © 2022 恒心教 (Koushinism)
# Released under the MIT license
# https://opensource.org/licenses/mit-license.php
"""Twitter自動収集スクリプト
ver4.4.2 2024/11/9恒心
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です。
前開発者との出会いに感謝。
本スクリプトは `archive.today <https://archive.today>`_ からWikiに未掲載のツイートを収集します。
Examples:
定数類は状況に応じて変えてください。:class:`~UserProperties` で変更できます。
::
$ python3 (ファイル名)
オプションに ``--krsw`` とつけると自動モードになります。
::
$ python3 (ファイル名) --krsw
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
つまりユーザー入力が要りません。
``--no-browser`` オプションでTor Browserを使用しないモードに、
``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
``--search-unarchived`` オプションでは、従来の通りNitterからツイートを収集するモードになります (廃止予定)。
Note:
* Pythonのバージョンは3.12以上
* 環境は玉葱前提です。
* TailsやWhonixでない場合、Tor Browserを入れておくか、torコマンドでプロキシを立てておくことが必要です。
* MacOSで動作確認済
* MacOSの場合はTor Browserをダウンロードするかbrewでtorコマンドを導入してから実行する
* Whonix-WorkstationなどLinuxやWindowsでの動作が未確認です。確認と修正をお願いします
* PySocks, bs4, seleniumはインストールしないと標準で入ってません
* requests, typing_extensionsも環境によっては入っていないかもしれない
.. code-block:: bash
$ python3 -m pip install bs4 requests PySocks selenium typing_extensions
* pipも入っていなければ ``$ sudo apt install pip``
* `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます
* バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて \
<https://krsw-wiki.in/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_
"""
import json
import logging
import os
import platform
import random
import re
import shutil
import signal
import subprocess
import sys
import warnings
from abc import ABCMeta, abstractmethod
from argparse import ArgumentParser, Namespace
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from time import sleep
from traceback import TracebackException
from types import FrameType, MappingProxyType, TracebackType
from typing import (Final, NamedTuple, NoReturn, Self, TextIO, assert_never,
final, override)
from urllib.parse import (ParseResult, parse_qs, quote, unquote, urljoin,
urlparse)
from zoneinfo import ZoneInfo
import requests
from bs4 import BeautifulSoup
from bs4.element import NavigableString, ResultSet, Tag
from selenium.common.exceptions import (InvalidSwitchToTargetException,
WebDriverException)
from selenium.webdriver import Firefox as FirefoxDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
from typing_extensions import deprecated
@dataclass(init=False, eq=False, frozen=True)
class UserProperties:
"""ユーザ設定。
実行時に変更する可能性のある定数はここで定義する。
"""
media_dir: Final[str] = 'tweet_media'
"""Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。"""
filename: Final[str] = 'tweet.txt'
"""Final[str]: ツイートを保存するファイルの名前。"""
log_file: Final[str] = 'twitter_archiver.log'
"""Final[str]: ログを保存するファイルの名前。"""
@dataclass(init=False, eq=False, frozen=True)
class NitterCrawler:
"""``--search-unarchived`` オプション有りの時に利用する設定値。"""
limit_n_tweets: Final[int] = 100
"""Final[int]: 取得するツイート数の上限。"""
report_interval: Final[int] = 5
"""Final[int]: 記録件数を報告するインターバル。"""
nitter_instance: Final[str] = 'https://nitter.poast.org/' # noqa: E501
"""Final[str]: Nitterのインスタンス。
生きているのは https://github.com/zedeus/nitter/wiki/Instances で確認。
Note:
末尾にスラッシュ必須。
インスタンスによっては画像の取得ができない。
Tor用のインスタンスでないと動画の取得ができない。
"""
@dataclass(init=False, eq=False, frozen=True)
class ArchiveCrawler:
"""``--search-unarchived`` オプション無しの時に使用する設定値。"""
url_list_filename: Final[str] = 'url_list.txt'
"""Final[str]: URLのリストをダンプするファイル名。"""
# ログ設定
# basicConfigでレベルを設定するとモジュールのDEBUGログなども出力される
formatter: Final[logging.Formatter] = logging.Formatter(
fmt='{asctime} [{levelname:.4}] : {message}', style='{')
logger: Final[logging.Logger] = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 標準エラー出力へのログ出力
stream_handler: Final[logging.StreamHandler[TextIO]] = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
# ファイルへのログ出力
file_handler: Final[logging.FileHandler] = (
logging.FileHandler(UserProperties.log_file))
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
class AccessError(Exception):
"""RequestsとSeleniumで共通のアクセスエラー。"""
pass
class ReCaptchaRequiredError(Exception):
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。"""
pass
class AbstractAccessor(metaclass=ABCMeta):
"""HTTPリクエストでWebサイトに接続するための基底クラス。"""
WAIT_TIME: Final[int] = 1
"""Final[int]: HTTPリクエスト成功失敗関わらず待機時間(秒)。
Note:
1秒待つだけで行儀がいいクローラーだそうなので既定では1秒。
しかし日本のポリホーモは1秒待っていても捕まえてくるので注意 [1]_。
References:
.. [1] 岡崎市立中央図書館事件. (2022, June 21). In Wikipedia.
https://ja.wikipedia.org/wiki/?curid=2187212&oldid=79121945
"""
WAIT_RANGE: Final[int] = 5
"""Final[int]: ランダムな時間待機するときの待機時間の幅(秒)。"""
REQUEST_TIMEOUT: Final[int] = 30
"""Final[int]: HTTPリクエストのタイムアウト秒数。"""
@abstractmethod
def get(self, url: str) -> str:
"""URLにアクセスして、HTMLを取得する。
Args:
url (str): 接続するURL。
Raises:
AccessError: 通信に失敗した場合のエラー。
Returns:
str: レスポンスのHTML。
"""
...
@abstractmethod
def set_cookies(self, cookies: dict[str, str]) -> None:
"""Cookieをセットする。
Args:
cookies (dict[str, str]): Cookieのキーバリューペア。
"""
...
@final
def _random_sleep(self) -> None:
"""ランダムな秒数スリープする。
自動操縦だとWebサイトに見破られないため。
"""
sleep(random.randrange(self.WAIT_TIME,
self.WAIT_TIME + self.WAIT_RANGE))
class RequestsAccessor(AbstractAccessor):
"""RequestsモジュールでWebサイトに接続するためのクラス。"""
HEADERS: Final[dict[str, str]] = {
'User-Agent':
'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/120.0' # noqa: E501
}
"""Final[dict[str, str]]: HTTPリクエスト時のヘッダ。"""
PROXIES_WITH_BROWSER: Final[dict[str, str]] = {
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
}
"""Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。"""
PROXIES_WITH_COMMAND: Final[dict[str, str]] = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
}
"""Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。"""
TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip'
"""Final[str]: Tor経由で通信しているかチェックするサイトのURL。"""
def __init__(self) -> None:
# Torに必要なプロキシをセット
self._cookies: dict[str, str] = {}
self._proxies: dict[str, str] | None = self._choose_tor_proxies()
def _execute(self, url: str) -> requests.models.Response:
"""引数のURLにRequestsモジュールでHTTP接続する。
Args:
url (str): 接続するURL。
Raises:
requests.HTTPError: ステータスコードが200でない場合のエラー。
Returns:
requests.models.Response: レスポンスのオブジェクト。
"""
sleep(self.WAIT_TIME) # DoS対策で待つ
res: Final[requests.models.Response] = requests.get(
url,
timeout=self.REQUEST_TIMEOUT,
headers=self.HEADERS,
allow_redirects=False,
proxies=self._proxies,
cookies=self._cookies)
res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生
return res
@override
def get(self, url: str) -> str:
try:
return self._execute(url).text
except (requests.HTTPError, requests.ConnectionError) as e:
raise AccessError from e
@override
def set_cookies(self, cookies: dict[str, str]) -> None:
self._cookies.update(cookies)
def get_image(self, url: str) -> bytes | None:
"""引数のURLから画像のバイナリ列を取得する。
Args:
url (str): 接続するURL。
Raises:
AccessError: アクセスに失敗した場合のエラー。
Returns:
bytes | None: 画像のバイナリ。画像でなければ `None`。
"""
try:
res: Final[requests.models.Response] = self._execute(url)
except (requests.HTTPError, requests.ConnectionError) as e:
raise AccessError from e
if 'image' in res.headers['content-type']:
return res.content
else:
return None
def _choose_tor_proxies(self) -> dict[str, str] | None:
"""Torを使うのに必要なプロキシ情報を返す。
プロキシなしで接続できれば `None`、
Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、
torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。
いずれでもアクセスできなければ異常終了する。
Raises:
AccessError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。
Returns:
dict[str, str] | None: プロキシ情報。
"""
logger.info('Torのチェック中ですを')
# プロキシなしでTorにアクセスできるかどうか
self._proxies = None
res: str = self._execute(self.TOR_CHECK_URL).text
is_tor: bool = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor connection OK')
return None
# Tor BrowserのプロキシでTorにアクセスできるかどうか
try:
self._proxies = self.PROXIES_WITH_BROWSER
res = self._execute(self.TOR_CHECK_URL).text
is_tor = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor browser connection OK')
return self.PROXIES_WITH_BROWSER
except requests.exceptions.ConnectionError:
pass
# torコマンドのプロキシでTorにアクセスできるかどうか
try:
self._proxies = self.PROXIES_WITH_COMMAND
res = self._execute(self.TOR_CHECK_URL).text
is_tor = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor proxy OK')
return self.PROXIES_WITH_COMMAND
else:
raise AccessError('サイトにTorのIPでアクセスできていないなりを')
except requests.exceptions.ConnectionError:
logger.critical('通信がTorのSOCKS proxyを経由していないなりを', exc_info=True)
sys.exit(1)
@property
def proxies(self) -> dict[str, str] | None:
"""dict[str, str] | None: プロキシ設定。"""
return self._proxies
class SeleniumAccessor(AbstractAccessor):
"""SeleniumでWebサイトに接続するためのクラス。
Args:
enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
"""
TOR_BROWSER_PATHS: Final[MappingProxyType[str, str]] = MappingProxyType({
'Windows': r'C:\Program Files\Tor Browser\Browser\firefox.exe',
'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox',
'Linux': '/usr/bin/torbrowser'
})
"""Final[MappingProxyType[str, str]]: OSごとのTor Browserのパス。"""
WEB_DRIVER_WAIT_TIME: Final[int] = 15
"""Final[int]: 最初のTor接続時の待機時間(秒)。"""
WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000
"""Final[int]: reCAPTCHAのための待機時間(秒)。"""
def __init__(self, enable_javascript: bool) -> None:
self._options: Final[FirefoxOptions] = FirefoxOptions()
self._options.binary_location = (
self.TOR_BROWSER_PATHS[platform.system()]
)
self._options.add_argument('--user-data-dir=selenium') # pyright: ignore [reportUnknownMemberType] # noqa: E501
if enable_javascript:
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')
self._options.set_preference('javascript.enabled', enable_javascript)
self._options.set_preference('intl.accept_languages', 'en-US, en')
self._options.set_preference('intl.locale.requested', 'US')
self._options.set_preference('font.language.group', 'x-western')
# 自動操縦と見破られないための設定
self._options.set_preference('dom.webdriver.enabled', False)
self._refresh_browser()
def quit(self) -> None:
"""Seleniumドライバを終了する。"""
if hasattr(self, '_driver'):
logger.debug('ブラウザ終了')
self._driver.quit()
def _refresh_browser(self) -> None:
"""ブラウザを起動する。"""
try:
logger.debug('ブラウザ起動')
self._driver: FirefoxDriver = FirefoxDriver(
options=self._options)
sleep(1)
web_driver_wait: Final[WebDriverWait[FirefoxDriver]] = (
WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME))
web_driver_wait.until(
ec.element_to_be_clickable((By.ID, 'connectButton'))
)
self._driver.find_element(By.ID, 'connectButton').click()
# Torの接続が完了するまで待つ
web_driver_wait.until(ec.url_contains('about:blank'))
except BaseException:
self.quit()
raise
def _check_recaptcha(self, url: str) -> None:
"""reCAPTCHAが表示されているかどうか判定して、入力を待機する。
botであることが検知された場合、自動でブラウザを再起動する。
Args:
url (str): アクセスしようとしているURL。\
reCAPTCHAが要求されると `current_url` が変わることがあるので必要。
Raises:
ReCaptchaRequiredError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。
"""
iframe_selector: Final[str] = (
'iframe[title="recaptcha challenge expires in two minutes"]')
if len(self._driver.find_elements(By.ID, 'g-recaptcha')) > 0:
if self._options.preferences.get('javascript.enabled'): # pyright: ignore[reportUnknownMemberType] # noqa: E501
logger.warning(f'{url} でreCAPTCHAが要求されたナリ')
print('reCAPTCHAを解いてね(笑)、それはできるよね。\a\a\a')
print('botバレしたら自動でブラウザが再起動するナリよ')
print('Tips: カーソルを迷ったように動かすとか、人間らしく振る舞うのがコツナリ')
WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until(
ec.presence_of_element_located(
(By.CSS_SELECTOR, iframe_selector)
)
)
self._driver.switch_to.frame( # reCAPTCHAのフレームに遷移する
self._driver.find_element(By.CSS_SELECTOR, iframe_selector)
)
try:
WebDriverWait(
self._driver, self.WAIT_TIME_FOR_RECAPTCHA
).until(
ec.visibility_of_element_located(
# bot検知された場合に現れるクラス
(By.CLASS_NAME, 'rc-doscaptcha-header')
)
)
except InvalidSwitchToTargetException:
# reCAPTCHAのフレームがなくなっていた場合
logger.info('reCAPTCHAが解かれましたを')
self._driver.switch_to.default_content()
self._random_sleep() # DoS対策で待つ
else:
# waitを普通に抜けた場合
logger.warning('botバレしたなりを')
# 一回ブラウザを落として起動し直す
self.quit()
self._refresh_browser()
self.get(url)
else:
raise ReCaptchaRequiredError(
f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}')
@override
def get(self, url: str) -> str:
self._random_sleep() # DoS対策で待つ
try:
self._driver.get(url)
WebDriverWait(self._driver, self.WEB_DRIVER_WAIT_TIME).until_not(
ec.any_of(
ec.title_is('Redirecting'),
ec.title_is('Verifying your browser | Nitter')
)
) # Nitterのリダイレクトを検知する
self._check_recaptcha(url)
except WebDriverException as e:
# Selenium固有の例外を共通の例外に変換
raise AccessError from e
sleep(5)
return self._driver.page_source
@override
def set_cookies(self, cookies: dict[str, str]) -> None:
for name, value in cookies.items():
self._driver.add_cookie( # pyright: ignore [reportUnknownMemberType] # noqa: E501
{'name': name, 'value': value})
self._driver.refresh()
class AccessorHandler:
"""WebサイトからHTMLを取得するためのクラス。
RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。
Args:
use_browser (bool): `True` ならSeleniumを利用する。`False` ならRequestsのみでアクセスする。
enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。
"""
LIMIT_N_REQUESTS: Final[int] = 5
"""Final[int]: HTTPリクエスト失敗時の再試行回数。"""
WAIT_TIME_FOR_ERROR: Final[int] = 4
"""Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。"""
def __init__(self, use_browser: bool, enable_javascript: bool) -> None:
self._selenium_accessor: Final[SeleniumAccessor | None] = (
SeleniumAccessor(enable_javascript) if use_browser else None
)
self._requests_accessor: Final[RequestsAccessor] = RequestsAccessor()
self._last_url = '' # 最後にアクセスしたURL
def __enter__(self) -> Self:
return self
def __exit__(self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None) -> None:
if self._selenium_accessor is not None:
self._selenium_accessor.quit()
def request_once(self, url: str) -> str:
"""引数のURLにHTTP接続する。
Args:
url (str): 接続するURL。
Returns:
str: レスポンスのテキスト。
Raises:
AccessError: アクセスエラー。
Note:
失敗かどうかは呼出側で要判定。
"""
try:
if self._selenium_accessor is not None:
return self._selenium_accessor.get(url)
else:
return self._requests_accessor.get(url)
except AccessError:
raise
def _request_with_callable[T](
self,
url: str,
request_callable: Callable[[str], T]
) -> T | None:
"""`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きると `None` を返す。
Args:
url (str): 接続するURL
request_callable (Callable[[str], T]): 1回リクエストを行うメソッド。
Returns:
T | None: レスポンス。接続失敗が何度も起きると `None` を返す。
"""
assert url, 'URLが空っぽでふ'
logger.debug('Requesting ' + unquote(url))
self._last_url = url
for i in range(self.LIMIT_N_REQUESTS):
try:
res: T = request_callable(url)
except AccessError:
logger.warning(
url + 'への通信失敗ナリ '
f'{i + 1}/{self.LIMIT_N_REQUESTS}回')
logger.debug('エラーログ', exc_info=True)
if i < self.LIMIT_N_REQUESTS:
sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ
else:
return res
return None
def request(self, url: str) -> str | None:
"""HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きると `None` を返す。
Args:
url (str): 接続するURL。
Returns:
str | None: レスポンスのテキスト。接続失敗が何度も起きると `None` を返す。
Note:
失敗かどうかは呼出側で要判定
"""
return self._request_with_callable(url, self.request_once)
def request_with_requests_module(self, url: str) -> str | None:
"""RequestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きると `None` を返す。
Args:
url (str): 接続するURL。
Returns:
str | None: レスポンスのテキスト。接続失敗が何度も起きると `None` を返す。
Note:
失敗かどうかは呼出側で要判定
"""
return self._request_with_callable(url, self._requests_accessor.get)
def request_image(self, url: str) -> bytes | None:
"""Requestsモジュールで画像ファイルを取得する。
成功すると結果を返す。
接続失敗が何度も起きると `None` を返す。
Args:
url (str): 接続するURL。
Returns:
bytes | None: レスポンスのバイト列。接続失敗が何度も起きると `None` を返します。
Note:
失敗かどうかは呼出側で要判定
"""
return self._request_with_callable(
url,
self._requests_accessor.get_image)
def set_cookies(self, cookies: dict[str, str]) -> None:
"""Cookieをセットする。
Args:
cookies (dict[str, str]): Cookieのキーバリューペア。
"""
self._requests_accessor.set_cookies(cookies)
if self._selenium_accessor is not None:
self._selenium_accessor.set_cookies(cookies)
@property
def last_url(self) -> str:
"""str: 最後にアクセスしたURL。"""
return self._last_url
@property
def proxies(self) -> dict[str, str] | None:
"""dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。"""
return self._requests_accessor.proxies
class TableBuilder:
"""Wikiの表を組み立てるためのクラス。
Args:
date (datetime | None, optional): 記録するツイートの最新日付。デフォルトは今日の日付。
Attributes:
_tables (list[str]): ツイートのリストを日毎にまとめたもの。\
一番最後の要素が `_date` に対応し、最初の要素が最近の日付となる。
_count (int): 表に追加したツイートの件数。
_date (datetime): 現在収集中のツイートの日付。
"""
FILENAME: Final[str] = UserProperties.filename
"""Final[str]: ツイートを保存するファイルの名前。"""
def __init__(self, date: datetime | None = None) -> None:
self._tables: Final[list[str]] = ['']
self._count: int = 0 # 記録数
self._date: datetime = date or datetime.today()
@property
def count(self) -> int:
"""int: 表に追加したツイートの件数。"""
return self._count
def append(self, callinshow_template: str, text: str) -> None:
"""ツイートを表に追加する。
Args:
callinshow_template (str): ツイートのURLをCallinshowLinkテンプレートに入れたもの。
text (str): ツイートの本文。
"""
self._tables[-1] = (
'!' + callinshow_template + '\n|-\n|\n'
+ text
+ '\n|-\n'
+ self._tables[-1])
self._count += 1
def dump_file(self) -> None:
"""Wikiテーブルをファイル出力する。"""
self._next_day()
Path(self.FILENAME).write_text(
'\n'.join(reversed(self._tables)), 'utf-8'
)
logger.info('テキストファイル手に入ったやで〜')
def next_day_if_necessary(self, date: datetime) -> None:
"""引数dateがインスタンスの持っている日付より前の場合、日付更新処理をする。
Args:
date (datetime): 次のツイートの日付。
"""
if (date.year != self._date.year or date.month != self._date.month
or date.day != self._date.day):
self._next_day(date)
def _next_day(self, date: datetime | None = None) -> None:
"""Wikiテーブルに日付の見出しを付与し、日付を更新する。
記録済みのツイートが1つもない場合は日付の更新だけをする。
Args:
date (datetime | None, optional): 次に記録するツイートの日付。
"""
if self._tables[-1]:
self._tables[-1] = self._convert_to_text_table(self._tables[-1])
if platform.system() == 'Windows':
self._tables[-1] = self._date.strftime(
'\n=== %#m月%#d日 ===\n') + self._tables[-1]
logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
else:
self._tables[-1] = self._date.strftime(
'\n=== %-m月%-d日 ===\n') + self._tables[-1]
logger.info(self._date.strftime('%-m月%-d日のツイートを取得完了ですを'))
if date is not None:
self._date = date
if self._tables[-1]:
self._tables.append('')
def _convert_to_text_table(self, text: str) -> str:
"""Wikiでテーブル表示にするためのヘッダとフッタをつける。
Args:
text (str): ヘッダとフッタがないWikiテーブル。
Returns:
str: テーブル表示用のヘッダとフッタがついたWikiテーブル。
"""
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'
@classmethod
def escape_wiki_reserved_words(cls, text: str) -> str:
"""MediaWikiの文法と衝突する文字を無効化する。
Args:
text (str): ツイートの文字列。
Returns:
str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。
"""
def escape_nolink_urls(text: str) -> str:
"""Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。
Args:
text (str): ツイートの文字列。
Returns:
str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。
"""
is_in_archive_template: bool = False
i: int = 0
while i < len(text):
if is_in_archive_template:
if text[i:i + len('}}')] == '}}':
is_in_archive_template = False
i += len('}}')
else:
if (text[i:i + len('{{Archive|')] == '{{Archive|'
or text[i:i + len('{{Archive|')] == '{{archive|'):
is_in_archive_template = True
i += len('{{Archive|')
elif text[i:i + len('https://')] == 'https://':
text = (text[:i]
+ '<nowiki>https://</nowiki>'
+ text[i + len('https://'):])
i += len('<nowiki>https://</nowiki>')
elif text[i:i + len('http://')] == 'http://':
text = (text[:i]
+ '<nowiki>http://</nowiki>'
+ text[i + len('http://'):])
i += len('<nowiki>http://</nowiki>')
i += 1
return text
if not hasattr(cls, '_escape_callables'):
# 初回呼び出しの時だけ正規表現をコンパイルする
head_space_pattern: Final[re.Pattern[str]] = re.compile(
r'^ ', re.MULTILINE)
head_marks_pattern: Final[re.Pattern[str]] = re.compile(
r'^([\*#:;])', re.MULTILINE)
bar_pattern: Final[re.Pattern[str]] = re.compile(
r'^----', re.MULTILINE)
cls._escape_callables: tuple[Callable[[str], str], ...] = (
lambda t: t.replace('\n', '<br>\n'),
lambda t: head_space_pattern.sub(' ', t),
lambda t: head_marks_pattern.sub(r'<nowiki>\1</nowiki>', t),
lambda t: bar_pattern.sub('<nowiki>----</nowiki>', t),
lambda t: escape_nolink_urls(t),
)
escaped_text: str = text
for escape_callable in cls._escape_callables:
escaped_text = escape_callable(escaped_text)
return escaped_text
@staticmethod
def archive_url(
url: str,
archived_url: str,
text: str | None = None
) -> str:
"""URLをArchiveテンプレートでラップする。
Args:
url (str): ラップするURL。
archive_url (str): ラップするURLの魚拓のURL。
text (str | None, optional): ArchiveテンプレートでURLの代わりに表示する文字列。
Returns:
str: ArchiveタグでラップしたURL。
"""
if text is None:
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}'
else:
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url \
+ '|3=' + text + '}}'
@staticmethod
def callinshowlink_url(url: str, archived_url: str) -> str:
"""URLをCallinShowLinkテンプレートでラップする。
Args:
url (str): ラップするURL。
archive_url (str): ラップするURLの魚拓のURL。
Returns:
str: CallinShowLinkタグでラップしたURL。
"""
return '{{CallinShowLink|1=' + url + '|2=' + archived_url + '}}'
class FfmpegStatus(Enum):
"""ffmpegでの動画保存ステータス。"""
MP4 = 1
"""mp4の取得に成功したときのステータス。"""
TS = 2
"""tsの取得までは成功したが、mp4への変換に失敗したときのステータス。"""
FAILED = 3
"""m3u8からtsの取得に失敗したときのステータス。"""
class TwitterArchiver:
"""ツイートをWikiの形式にダンプするクラス。
Nitterからツイートを取得し、Wikiの形式にダンプする。
削除されたツイートや編集前のツイートは取得できない。
Important:
Nitterはサービスを終了しており、いずれすべてのインスタンスが使えなくなる [2]_。
本クラスを継承した :class:`~ArchiveCrawler` はarchive.todayからツイートを収集するため引き続き利用可能。
References:
.. [2] Nitter is over - It's been a fun ride. (2024, February 15).
https://nitter.cz
"""
NITTER_INSTANCE: Final[str] = UserProperties.NitterCrawler.nitter_instance
"""Final[str]: Nitterのインスタンス。
生きているのは https://github.com/zedeus/nitter/wiki/Instances で確認。
Note:
末尾にスラッシュ必須。
インスタンスによっては画像の取得ができない。
Tor用のインスタンスでないと動画の取得ができない。
"""
ARCHIVE_TODAY: Final[str] = 'http://archiveiya74codqgiixo33q62qlrqtkgmcitqx5u2oeqnmn5bpcbiyd.onion/' # noqa: E501
"""Final[str]: archive.todayの魚拓のonionドメイン。
ページにアクセスして魚拓があるか調べるのにはonion版のarchive.todayを使用。
Note:
末尾にスラッシュ必須。
"""
ARCHIVE_TODAY_STANDARD: Final[str] = 'https://archive.vn/'
"""Final[str]: archive.todayの魚拓のクリアネットドメイン。
Wiki上の記事にはクリアネット用のarchive.todayリンクを貼る。
Note:
末尾にスラッシュ必須。
"""
TWITTER_URL: Final[str] = 'https://x.com/'
"""Final[str]: TwitterのURL。
Note:
末尾にスラッシュ必須。
"""
TWITTER_MEDIA_URL: Final[str] = 'https://pbs.twimg.com/media/'
"""Final[str]: TwitterのメディアのURL。
"""
INVIDIOUS_INSTANCES_URL: Final[str] = \
'https://api.invidious.io/instances.json'
"""Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。"""
INVIDIOUS_INSTANCES_TUPLE: Final[tuple[str, ...]] = (
'piped.kavin.rocks',
'piped.video',
'invidious.poast.org'
)
"""Final[tuple[str, ...]]: よく使われるInvidiousインスタンスのリスト。
:const:`~INVIDIOUS_INSTANCES_URL` にアクセスしてもインスタンスが取得できないことがあるため、
それによってURLが置換できないことを防ぐ。
"""
CALLINSHOW: Final[str] = 'CallinShow'
"""Final[str]: 降臨ショーのユーザーネーム。"""
LIMIT_N_TWEETS: Final[int] = UserProperties.NitterCrawler.limit_n_tweets
"""Final[int]: 取得するツイート数の上限。"""
REPORT_INTERVAL: Final[int] = UserProperties.NitterCrawler.report_interval
"""Final[int]: 記録件数を報告するインターバル。"""
TWEETS_OR_REPLIES: Final[str] = 'with_replies'
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。"""
NITTER_ERROR_TITLE: Final[str] = 'Error|nitter'
"""Final[str]: Nitterでユーザーがいなかったとき返ってくるページのタイトル。
万が一仕様変更で変わったとき用。
"""
NO_ARCHIVE: Final[str] = 'No results'
"""Final[str]: archive.todayで魚拓がなかったときのレスポンス。
万が一仕様変更で変わったとき用。
"""
NEWEST: Final[str] = 'Load newest'
"""Final[str]: Nitterの前ページ読み込み部分の名前。
万が一仕様変更で変わったとき用。
"""
MEDIA_DIR: Final[str] = UserProperties.media_dir
"""Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。"""
def __init__(self) -> None:
self._check_constants() # スラッシュが抜けてないかチェック
self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック
self._img_ext_pattern: Final[re.Pattern[str]] = re.compile(
r'%2F([^%]*\.(?:jpg|jpeg|png|gif))')
self._url_fragment_pattern: Final[re.Pattern[str]] = re.compile(
r'#[^#]*$')
self._url_query_pattern: Final[re.Pattern[str]] = re.compile(r'\?.*$')
def _set_queries(
self, accessor: AccessorHandler, krsw: str | None
) -> bool:
"""検索条件を設定する。
:class:`~TwitterArchiver` のメンバをセットし、ツイートを収集するアカウントと
検索クエリ、終わりにするツイートを入力させる。
Args:
accessor (AccessorHandler): アクセスハンドラ
krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になり、\
クエリと終わりにするツイートが無しになる。\
更に空文字でもない場合、この引数が終わりにするツイートになる。
Returns:
bool: 処理成功時は `True`。
"""
# ユーザー名取得
if krsw is not None:
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
self._name: str = self.CALLINSHOW
else:
name_optional: str | None = self._input_name(accessor)
if name_optional is not None:
self._name: str = name_optional
else:
return False
# 検索クエリとページ取得
self._query_strs: list[str] = []
if krsw is not None:
logger.info('クエリは自動的になしにナリます')
else:
self._input_query()
page_optional: str | None = accessor.request(
urljoin(self.NITTER_INSTANCE, self._name + '/'
+ self.TWEETS_OR_REPLIES))
if page_optional is None:
self._on_fail(accessor)
return False
self._nitter_page: str = page_optional
# Nitter用のCookieをセット
accessor.set_cookies({
'infiniteScroll': '',
'proxyVideos': 'on',
'replaceReddit': '',
'replaceYouTube': '',
})
# 終わりにするツイート取得
if krsw == '':
logger.info('終わりにするツイートは自動的になしにナリます')
self._stop: str = ''
elif krsw is not None:
logger.info(f'終わりにするツイートは自動的に"{krsw}"にナリます')
self._stop: str = krsw
else:
self._stop: str = self._input_stop_word()
logger.info(
'ユーザー名: @' + self._name
+ ', クエリ: ["' + '", "'.join(self._query_strs)
+ '"], 終わりにする文言: "' + self._stop
+ '"で検索しまふ'
)
# 日付取得
tweet_link: Final[Tag | NavigableString | None] = BeautifulSoup(
self._nitter_page, 'html.parser'
).find(class_='tweet-link')
assert isinstance(tweet_link, Tag)
href: Final[str | list[str] | None] = tweet_link.get('href')
assert isinstance(href, str)
date: Final[datetime] = self._tweet_date(
href.split('#')[0]) # フラグメント識別子を除去
self._table_builder: TableBuilder = TableBuilder(date)
return True
def _check_constants(self) -> None:
"""定数のバリデーションを行う。
Returns:
None: すべての対象定数が正しければ `None`。失敗したら例外を出す。
Raises:
AssertionError: バリデーションに違反した場合に出る。
"""
assert self.NITTER_INSTANCE[-1] == '/', 'NITTER_INSTANCEの末尾をには/が必須です'
assert self.ARCHIVE_TODAY[-1] == '/', 'ARCHIVE_TODAYの末尾をには/が必須です'
assert self.ARCHIVE_TODAY_STANDARD[-1] == '/', \
'ARCHIVE_TODAY_STANDARDの末尾をには/が必須です'
assert self.TWITTER_URL[-1] == '/', 'TWITTER_URLの末尾をには/が必須です'
def _check_ffmpeg(self) -> bool:
"""ffmpegがインストールされているかチェックする。
Returns:
bool: ffmpegがインストールされているか。
"""
return shutil.which('ffmpeg') is not None
def _check_nitter_instance(self, accessor: AccessorHandler) -> None:
"""Nitterのインスタンスが生きているかチェックする。
死んでいたらそこで終了。
接続を一回しか試さない :func:`~_request_once` を使っているのは、
激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
None: Nitterにアクセスできれば `None`。できなければ終了。
"""
logger.info('Nitterのインスタンスチェック中ですを')
try:
accessor.request_once(self.NITTER_INSTANCE)
except AccessError:
logger.critical('インスタンスが死んでますを', exc_info=True)
sys.exit(1)
logger.info('Nitter OK')
def _check_archive_instance(self, accessor: AccessorHandler) -> None:
"""archive.todayのTor用インスタンスが生きているかチェックする。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
None: archive.todayのTorインスタンスにアクセスできれば `None`。できなければ終了。
"""
logger.info('archive.todayのTorインスタンスチェック中ですを')
try:
accessor.request_once(self.ARCHIVE_TODAY)
except AccessError: # エラー発生時は終了
logger.critical('インスタンスが死んでますを', exc_info=True)
sys.exit(1)
logger.info('archive.today OK')
def _invidious_instances(
self, accessor: AccessorHandler
) -> tuple[str, ...]:
"""Invidiousのインスタンスのタプルを取得する。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
tuple[str, ...]: Invidiousのインスタンスのタプル。インスタンスが死んでいれば終了。
"""
logger.info('Invidiousのインスタンスリストを取得中ですを')
invidious_json: Final[str | None] = (
accessor.request_with_requests_module(self.INVIDIOUS_INSTANCES_URL)
)
if invidious_json is None:
logger.critical('Invidiousが死んでますを')
sys.exit(1)
instance_list: Final[list[str]] = []
for instance_info in json.loads(invidious_json):
instance_list.append(instance_info[0])
# よく使われているものはチェック
for invidious_api in self.INVIDIOUS_INSTANCES_TUPLE:
if invidious_api not in instance_list:
instance_list.append(invidious_api)
logger.debug('Invidiousのインスタンス: [' + ', '.join(instance_list) + ']')
return tuple(instance_list)
def _input_name(self, accessor: AccessorHandler) -> str | None:
"""ツイート収集するユーザー名を標準入力から取得する。
何も入力しないと :const:`~CALLINSHOW` を指定する。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str | None: ユーザ名。ユーザページの取得に失敗したら `None`。
"""
while True:
print(
'アカウント名を入れなければない。空白だと自動的に'
+ self.CALLINSHOW
+ 'になりますを')
print('> ', end='')
account_str: str = input()
# 空欄で降臨ショー
if account_str == '':
return self.CALLINSHOW
else:
res: str | None = accessor.request(
urljoin(self.NITTER_INSTANCE, account_str))
if res is None: # リクエスト失敗判定
self._on_fail(accessor)
return None
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser')
if soup.title == self.NITTER_ERROR_TITLE:
print(account_str + 'は実在の人物ではありませんでした')
else:
print('最終的に出会ったのが@' + account_str + 'だった。')
logger.info('@' + account_str + 'をクロールしまふ')
return account_str
def _input_query(self) -> None:
"""検索クエリを標準入力から取得する。"""
print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。')
print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行')
print('> ', end='')
query_input: str = input()
# 空欄が押されるまでユーザー入力受付
while query_input != '':
self._query_strs.append(query_input)
print('> ', end='')
query_input = input()
print('クエリのピースが埋まっていく。')
def _on_fail(self, accessor: AccessorHandler) -> None:
"""接続失敗時処理。
取得に成功した分だけファイルにダンプする。
ログにトレースバックも表示する。
Args:
accessor (AccessorHandler): アクセスハンドラ。
"""
logger.critical('接続失敗時処理をしておりまふ', stack_info=True)
logger.critical('最後にアクセスしたURL: ' + accessor.last_url)
print('接続失敗しすぎで強制終了ナリ')
if self._table_builder.count > 0: # 取得成功したデータがあれば発行
print('取得成功した分だけ発行しますを')
self._table_builder.dump_file()
def _input_stop_word(self) -> str:
"""ツイートの記録を中断するための文をユーザに入力させる。
Returns:
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。
"""
print(
'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)'
f'ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。')
print('> ', end='')
end_str: Final[str] = input()
return end_str
def _download_media(
self,
media_url: str,
media_name: str,
accessor: AccessorHandler
) -> bool:
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。
Args:
media_url (str): 画像のURL。
media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、\
``/pic/media%2F`` に後続する。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
bool: 保存に成功したかどうか。
"""
os.makedirs(self.MEDIA_DIR, exist_ok=True)
image_bytes: Final[bytes | None] = accessor.request_image(media_url)
if image_bytes is not None:
Path(self.MEDIA_DIR, media_name).write_bytes(image_bytes)
return True
else:
return False
def _download_m3u8(
self,
media_url: str,
ts_filename: str | None,
mp4_filename: str,
proxies: dict[str, str] | None
) -> FfmpegStatus:
"""ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。
Args:
media_url (str): 動画のm3u8/mp4ファイルのURL。
ts_filename (str | None): m3u8から取得したtsファイルのパス。\
`None` の場合はtsファイルをダウンロードせず、直接mp4をダウンロードする。
mp4_filename (str): 取得したmp4ファイルのパス。
proxies (dict[str, str] | None): ffmpegでの通信に用いるプロキシ設定。
Returns:
FfmpegStatus: ffmpegでの保存ステータス。
"""
os.makedirs(self.MEDIA_DIR, exist_ok=True)
if ts_filename is not None:
ts_returncode: Final[int] = subprocess.run(
[
'ffmpeg', '-y',
'-http_proxy', 'proxies["http"]',
'-i', media_url,
'-c', 'copy', ts_filename
] if proxies is not None else [
'ffmpeg', '-y',
'-i', media_url,
'-c', 'copy', ts_filename
],
stdout=subprocess.DEVNULL).returncode
# 取得成功したらtsをmp4に変換
if ts_returncode == 0:
ts2mp4_returncode: Final[int] = subprocess.run(
[
'ffmpeg', '-y', '-i', ts_filename,
'-acodec', 'copy', '-vcodec', 'copy', mp4_filename
],
stdout=subprocess.DEVNULL
).returncode
if ts2mp4_returncode == 0:
return FfmpegStatus.MP4
else:
return FfmpegStatus.TS
else:
return FfmpegStatus.FAILED
else:
returncode: Final[int] = subprocess.run(
[
'ffmpeg', '-y',
'-http_proxy', 'proxies["http"]',
'-i', media_url,
'-c', 'copy', mp4_filename
] if proxies is not None else [
'ffmpeg', '-y',
'-i', media_url,
'-c', 'copy', mp4_filename
],
stdout=subprocess.DEVNULL).returncode
if returncode == 0:
return FfmpegStatus.MP4
else:
return FfmpegStatus.FAILED
def _tweet_date(self, url: str) -> datetime:
"""ツイートの時刻を取得する。
URLのIDから、Snowflakeの手順を逆算してタイムスタンプを計算している。
https://github.com/twitter-archive/snowflake/blob/b3f6a3c6ca8e1b6847baa6ff42bf72201e2c2231/src/main/scala/com/twitter/service/snowflake/IdWorker.scala#L91
Args:
url (str): ツイートのURL。
Returns:
datetime: ツイートの時刻。
"""
id: int = int(url.split('/')[-1])
timestamp: float = ((id >> 22) + 1_288_834_974_657) / 1000.
return datetime.fromtimestamp(timestamp, tz=ZoneInfo('Asia/Tokyo'))
def _fetch_tweet_media(
self,
tweet: Tag,
tweet_url: str,
accessor: AccessorHandler
) -> str:
"""ツイートの画像や動画を取得する。
Args:
tweet (Tag): ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
tweet_url (str): ツイートのURL。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str: Wiki記法でのファイルへのリンクの文字列。
"""
# 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択
tweet_media: Final[Tag | None] = tweet.select_one(
'.tweet-body > .attachments')
media_txt: str = ''
if tweet_media is not None:
media_list: Final[list[str]] = []
# ツイートの画像の取得
for image_a in tweet_media.select('.attachment.image a'):
try:
href: str | list[str] | None = image_a.get('href')
assert isinstance(href, str)
img_matched: re.Match[str] | None = (
self._img_ext_pattern.search(href))
assert img_matched is not None
media_name: str = img_matched.group(1)
media_list.append(f'[[ファイル:{media_name}|240px]]')
if self._download_media(
urljoin(self.TWITTER_MEDIA_URL, media_name),
media_name,
accessor):
logger.info(
os.path.join(self.MEDIA_DIR, media_name)
+ ' をアップロードしなければない。')
else:
logger.info(
urljoin(self.TWITTER_MEDIA_URL, media_name)
+ ' をアップロードしなければない。')
except AttributeError:
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
for i, video_container in enumerate(
tweet_media.select('.attachment.video-container')):
if not self._has_ffmpeg:
logger.warning(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
continue
# videoタグがない場合は取得できない
video: Tag | None = video_container.select_one('video')
if video is None:
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
continue
# videoタグのdata-url属性またはvideoタグ直下のsourceタグからURLが取得できる
data_url: str | list[str] | None = video.get('data-url')
source_tag: Tag | None = video.select_one('source')
src_url: str | list[str] | None = \
source_tag.get('src') if source_tag is not None else None
video_url: str | list[str] | None = data_url or src_url
assert isinstance(video_url, str)
tweet_id: str = tweet_url.split('/')[-1]
if data_url is not None:
# data-url属性からURLを取得した場合
video_matched: re.Match[str] | None = re.search(
r'[^/]+$', video_url)
assert video_matched is not None
media_path: str = unquote(video_matched.group())
media_url: str = urljoin(self.NITTER_INSTANCE, media_path)
ts_filename: str | None = (
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
)
else:
# sourceタグからURLを取得した場合
media_url: str = video_url
ts_filename: str | None = None
mp4_filename: str = (
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4'
)
match self._download_m3u8(
media_url,
ts_filename,
mp4_filename,
accessor.proxies):
case FfmpegStatus.MP4:
logger.info(f'{mp4_filename}をアップロードしなければない。')
media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
case FfmpegStatus.TS:
logger.info(f'{ts_filename}.tsをmp4に変換してアップロードしなければない。')
media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
case FfmpegStatus.FAILED:
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
case _ as unreachable: # pyright: ignore[reportUnnecessaryComparison] # noqa: E501
assert_never(unreachable)
media_txt = ' '.join(media_list)
return media_txt
def _get_tweet_quote(self, tweet: Tag, accessor: AccessorHandler) -> str:
"""引用リツイートの引用元へのリンクを取得する。
Args:
tweet (Tag): ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str: Archiveテンプレートでラップされた引用元ツイートへのリンク。
"""
tweet_quote: Final[Tag | None] = tweet.select_one(
'.tweet-body > .quote.quote-big') # 引用リツイートを選択
quote_txt: str = ''
if tweet_quote is not None:
quote_link: Final[Tag | None] = (
tweet_quote.select_one('.quote-link'))
assert quote_link is not None
link_href: Final[str | list[str] | None] = quote_link.get('href')
assert isinstance(link_href, str)
link: str = self._url_fragment_pattern.sub('', link_href)
link = urljoin(self.TWITTER_URL, link)
quote_txt = self._archive_url(link, accessor)
tweet_quote_unavailable: Final[Tag | None] = tweet.select_one(
'.tweet-body > .quote.unavailable') # 引用リツイートを選択
if tweet_quote_unavailable is not None:
quote_txt = '(引用元が削除されました)'
return quote_txt
def _get_tweet_poll(self, tweet: Tag) -> str:
"""ツイートの投票結果を取得する。
Args:
tweet (Tag): ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
Returns:
str: Wiki形式に書き直した投票結果。
"""
tweet_poll: Final[Tag | None] = tweet.select_one('.tweet-body > .poll')
poll_txt: str = ''
if tweet_poll is not None:
poll_meters: Final[ResultSet[Tag]] = tweet_poll.select(
'.poll-meter')
poll_info: Final[Tag | None] = tweet_poll.select_one('.poll-info')
assert poll_info is not None
for poll_meter in poll_meters:
poll_choice_value: Tag | None = poll_meter.select_one(
'.poll-choice-value')
assert poll_choice_value is not None
ratio: str = poll_choice_value.text
poll_choice_option: Tag | None = poll_meter.select_one(
'.poll-choice-option')
assert poll_choice_option is not None
if 'leader' in poll_meter['class']:
poll_txt += ('<br>\n'
' <span style="display: inline-block; '
'width: 30em; background: linear-gradient('
'to right, '
f'rgba(29, 155, 240, 0.58) 0 {ratio}, '
f'transparent {ratio} 100%); '
'font-weight: bold;">') \
+ ratio + ' ' + poll_choice_option.text + '</span>'
else:
poll_txt += ('<br>\n'
' <span style="display: inline-block; '
'width: 30em; background: linear-gradient('
'to right, '
f'rgb(207, 217, 222) 0 {ratio}, '
f'transparent {ratio} 100%);">') \
+ ratio + ' ' + poll_choice_option.text + '</span>'
poll_txt += '<br>\n <span style="font-size: small;">' \
+ poll_info.text + '</span>'
return poll_txt
def _get_timeline_items(self, soup: Tag) -> list[Tag]:
"""タイムラインのツイートを取得。
基本的に投稿時刻の降順に取得し、リプライツリーは最後のツイートの時刻を基準として降順にひとまとまりにする。
Args:
soup (Tag): Nitterのページを表すbeautifulsoup4タグ。
Returns:
list[Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すTagオブジェクトのリスト。
"""
timeline_item_list: Final[list[Tag]] = []
for item_or_list in soup.select(
'.timeline > .timeline-item, .timeline > .thread-line'):
if 'unavailable' in item_or_list.attrs['class']:
continue
elif 'thread-line' in item_or_list.attrs['class']:
# そのままtimeline-itemクラスをfind_allするとツイートの順番が逆転するので、順番通りに取得するよう処理
for item in reversed(item_or_list.select('.timeline-item')):
timeline_item_list.append(item)
else:
timeline_item_list.append(item_or_list)
return timeline_item_list
def _archive_soup(self, tag: Tag, accessor: AccessorHandler) -> None:
"""ツイート内のaタグをテンプレートArchiveの文字列に変化させる。
NitterリンクをYouTubeへのリンクに、bibliogramへのリンクをInstagramへのリンクに修正する。
Args:
tag (Tag): ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。
accessor (AccessorHandler): アクセスハンドラ。
"""
urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a')
for url in urls_in_tweet:
href: str | list[str] | None = url.get('href')
assert isinstance(href, str)
if href.startswith('https://') or href.startswith('http://'):
# 先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない
if href.startswith('https' + self.NITTER_INSTANCE[4:]):
# Nitter上のTwitterへのリンクを直す
url_link: str = href.replace(
'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
url_link = self._url_query_pattern.sub('', url_link)
url.replace_with(self._archive_url(url_link, accessor))
elif href.startswith('https://nitter.kavin.rocks/'):
# Nitter上のTwitterへのリンクを直す
url_link: str = href.replace(
'https://nitter.kavin.rocks/', self.TWITTER_URL)
url_link = self._url_query_pattern.sub('', url_link)
url.replace_with(self._archive_url(url_link, accessor))
elif (hasattr(self, '_invidious_pattern')
and self._invidious_pattern.search(href)):
# Nitter上のYouTubeへのリンクをInvidiousのものから直す
invidious_href: str | list[str] | None = (
self._invidious_pattern.sub(
'youtube.com' if (
re.match(r'https://[^/]+/[^/]+/', href)
or re.search(r'/@[^/]*$', href)
) else 'youtu.be',
href))
url.replace_with(self._archive_url(
invidious_href, accessor))
elif href.startswith('https://bibliogram.art/'):
# Nitter上のInstagramへのリンクをBibliogramのものから直す
# Bibliogramは中止されたようなのでそのうちリンクが変わるかも
url_link: str = href.replace(
'https://bibliogram.art/',
'https://www.instagram.com/')
url.replace_with(self._archive_url(url_link, accessor))
else:
url.replace_with(self._archive_url(href, accessor))
elif url.text.startswith('@'):
url_link: str = urljoin(self.TWITTER_URL, href)
url_text: str = url.text
url.replace_with(
self._archive_url(
url_link, accessor, url_text))
def _archive_url(
self,
url: str,
accessor: AccessorHandler,
text: str | None = None
) -> str:
"""URLをArchiveテンプレートでラップする。
フラグメント識別子がURLに含まれていたら、Archive側のURLにも反映させる。
Args:
url (str): ラップするURL。
accessor (AccessorHandler): アクセスハンドラ。
text (str | None, optional): ArchiveテンプレートでURLの代わりに表示する文字列。
Returns:
str: ArchiveタグでラップしたURL。
"""
if '#' in url: # フラグメント識別子の処理
main_url, fragment = url.split('#', maxsplit=1)
return TableBuilder.archive_url(
url, self._archive(main_url, accessor) + '#' + fragment, text)
else:
return TableBuilder.archive_url(
url, self._archive(url, accessor), text)
def _callinshowlink_url(self, url: str, accessor: AccessorHandler) -> str:
"""URLをCallinShowLinkテンプレートでラップする。
Args:
url (str): ラップするURL。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str: CallinShowLinkタグでラップしたURL。
"""
return TableBuilder.callinshowlink_url(
url, self._archive(url, accessor))
def _archive(self, url: str, accessor: AccessorHandler) -> str:
"""URLから対応するarchive.todayのURLを返す。
取得できれば魚拓ページのURLを返す。
魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。
アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxx の形で返される。
Args:
url (str): 魚拓を取得するURL。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str: 魚拓のURL。
"""
archive_url: str = urljoin(
self.ARCHIVE_TODAY_STANDARD,
quote(unquote(url), safe='&=+?%'))
res: Final[str | None] = accessor.request(urljoin(
self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%')))
if res is None: # 魚拓接続失敗時処理
# https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される
logger.error(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。')
else:
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
content: Final[Tag | NavigableString | None] = soup.find(
id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得
if (content is None or content.get_text()[:len(self.NO_ARCHIVE)]
== self.NO_ARCHIVE): # 魚拓があるかないか判定
logger.warning(url + 'の魚拓がない。これはいけない。')
else:
assert isinstance(content, Tag)
content_a: Final[Tag | NavigableString | None] = (
content.select_one('.TEXT-BLOCK > a')) # 最新の魚拓を取得
assert isinstance(content_a, Tag)
href: Final[str | list[str] | None] = content_a.get('href')
assert isinstance(href, str)
archive_url = href.replace(
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
return archive_url
def _get_tweet(self, accessor: AccessorHandler) -> bool:
"""ページからツイート本文を ``TableBuilder`` インスタンスに収めていく。
ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら
`False` を返す。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
bool: 終わりにするツイートを発見するか、記録件数が上限に達したら `False`。
"""
soup: Final[BeautifulSoup] = BeautifulSoup(
self._nitter_page, 'html.parser')
tweets: Final[list[Tag]] = self._get_timeline_items(soup)
for tweet in tweets:
tweet_a: Tag | None = tweet.a
assert tweet_a is not None
if tweet_a.text == self.NEWEST:
# Load Newestのボタンは処理しない
continue
if tweet.find(class_='retweet-header') is not None:
# retweet-headerはリツイートを示すので入っていれば処理しない
continue
if tweet.find(class_='pinned') is not None:
# pinnedは固定ツイートを示すので入っていれば処理しない
continue
if len(self._query_strs) > 0:
# クエリが指定されている場合、一つでも含まないツイートは処理しない
not_match: bool = False
for query_str in self._query_strs:
if query_str not in tweet.text:
not_match = True
break
if not_match:
continue
tweet_link: Tag | NavigableString | None = tweet.find(
class_='tweet-link')
assert isinstance(tweet_link, Tag)
href: str | list[str] | None = tweet_link.get('href')
assert isinstance(href, str)
tweet_url: str = urljoin(
self.TWITTER_URL,
self._url_fragment_pattern.sub('', href))
# 日付の更新処理
date: datetime = self._tweet_date(tweet_url)
self._table_builder.next_day_if_necessary(date)
tweet_callinshow_template: str = self._callinshowlink_url(
tweet_url, accessor)
tweet_content: Tag | NavigableString | None = tweet.find(
class_='tweet-content media-body')
assert isinstance(tweet_content, Tag)
self._archive_soup(tweet_content, accessor)
media_txt: str = self._fetch_tweet_media(
tweet, tweet_url, accessor)
quote_txt: str = self._get_tweet_quote(tweet, accessor)
poll_txt: str = self._get_tweet_poll(tweet)
self._table_builder.append(
tweet_callinshow_template, '<br>\n'.join(
filter(
None,
[
TableBuilder.escape_wiki_reserved_words(
tweet_content.get_text()),
quote_txt, media_txt, poll_txt
])))
if self._table_builder.count % self.REPORT_INTERVAL == 0:
logger.info(
f'ツイートを{self._table_builder.count}件も記録したンゴwwwwwwwwwww')
if self._stop != '' and self._stop in tweet_content.get_text():
logger.info('目的ツイート発見でもう尾張屋根')
self._table_builder.dump_file()
return False
if self._table_builder.count >= self.LIMIT_N_TWEETS:
logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。')
self._table_builder.dump_file()
return False
return True
def _go_to_new_page(self, accessor: AccessorHandler) -> bool:
"""Nitterで次のページに移動する。
次のページが無ければ `False` を返す。
Args:
accessor (AccessorHandler): アクセスハンドラ。
Returns:
bool: 次のページを取得できれば `True`。
"""
soup: Final[BeautifulSoup] = BeautifulSoup(self._nitter_page,
'html.parser')
show_mores: Final[ResultSet[Tag]] = soup.find_all(class_='show-more')
assert len(show_mores) > 0
new_url: str = ''
for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある
show_more_a: Tag | None = show_more.a
assert show_more_a is not None
href: str | list[str] | None = show_more_a.get('href')
assert isinstance(href, str)
new_url = urljoin(
self.NITTER_INSTANCE,
self._name
+ '/'
+ self.TWEETS_OR_REPLIES
+ href) # 直下のaタグのhrefの中身取ってURL頭部分と合体
res: Final[str | None] = accessor.request(new_url)
if res is None:
self._on_fail(accessor)
return False
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
if new_page_soup.find(class_='timeline-end') is None:
# ツイートの終端ではtimeline-endだけのページになるので判定
logger.info(new_url + 'に移動しますを')
self._nitter_page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集
return True
else:
logger.info('急に残りツイートが無くなったな終了するか')
self._table_builder.dump_file()
return False
def _signal_handler(
self, signum: int, frame: FrameType | None
) -> NoReturn:
"""ユーザがCtrl + Cでプログラムを止めたときのシグナルハンドラ。
Args:
signum (int): シグナル番号。想定されるのはSIGINTのみ。
frame (FrameType | None): 現在のスタックフレーム。
"""
logger.info('ユーザがプログラムを中止したなりを')
self._table_builder.dump_file()
sys.exit()
@deprecated('\033[31m'
'Nitterはサービスを終了しており、いずれすべてのインスタンスが使えなくなるナリ。'
'--search-unarchived (-u)オプションを外してarchive.todayから収集するモードを'
'使ってね(笑)、それはできるよね。'
'See also: https://nitter.cz'
'\033[0m')
def execute(
self,
krsw: str | None = None,
use_browser: bool = True,
enable_javascript: bool = True
) -> None:
"""通信が必要な部分のロジック。
Args:
krsw (str | None, optional): `None` でない場合、名前が自動で \
:const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。\
更に空文字でもない場合、この引数が終わりにするツイートになる。
use_browser (bool, optional): `True` ならSeleniumを利用する。\
`False` ならRequestsのみでアクセスする。
enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \
`True`。
.. deprecated:: 4.2.0
Nitterはサービスを終了しており、いずれすべてのインスタンスが使えなくなる [2]_。
:class:`~ArchiveCrawler` はarchive.todayからツイートを収集するため引き続き利用可能。
"""
# Seleniumドライバーを必ず終了するため、with文を利用する。
with AccessorHandler(use_browser, enable_javascript) as accessor:
# 実行前のチェック
self._check_nitter_instance(accessor)
self._check_archive_instance(accessor)
# Invidiousのインスタンスリストの正規表現パターンを取得
invidious_url_tuple: Final[tuple[str, ...]] = (
self._invidious_instances(accessor)
)
self._invidious_pattern: re.Pattern[str] = re.compile(
'|'.join(invidious_url_tuple))
# 検索クエリの設定
if not self._set_queries(accessor, krsw):
sys.exit(1)
# ツイートを取得し終えるまでループ
signal.signal(signal.SIGINT, self._signal_handler)
try:
while True:
if not self._get_tweet(accessor):
break
if not self._go_to_new_page(accessor):
break
except BaseException:
logger.critical('予想外のエラーナリ。ここまでの成果をダンプして終了するナリ。')
self._table_builder.dump_file()
raise
class UrlTuple(NamedTuple):
"""URLとその魚拓のURLのペア。"""
url: str
"""URL。"""
archive_url: str
"""魚拓のURL。"""
class ArchiveCrawler(TwitterArchiver):
"""archive.todayに記録された尊師のツイートのうち、Wiki未掲載のものを収集する。
Warning:
このモードではURLリストの他にツイート本文も整形して取得するナリが、
機能のテストが不十分であることと、TwitterのHTML構造はしばしば変更されることから、
整形後の出力が正しいとは限らないナリ
機能を過信せず、自分の目で確かめてね(笑)、それはできるよね。
Todo:
* ちゃんとテストする。
"""
WIKI_URL: Final[str] = 'https://krsw-wiki.in'
"""Final[str]: WikiのURL。"""
TEMPLATE_URL: Final[str] = WIKI_URL + '/wiki/テンプレート:降臨ショー恒心ログ'
"""Final[str]: テンプレート:降臨ショー恒心ログのURL。"""
URL_LIST_FILENAME: Final[str] = \
UserProperties.ArchiveCrawler.url_list_filename
"""Final[str]: URLのリストをダンプするファイル名。"""
@override
def _set_queries(
self, accessor: AccessorHandler, krsw: str | None
) -> bool:
"""検索条件を設定する。
:class:`~TwitterArchiver` のメンバをセットし、ツイートを収集するアカウントを入力させ、
収集するツイートのうち最古のツイートIDを指定させる。
Args:
accessor (AccessorHandler): アクセスハンドラ
krsw (str | None): `None` でない場合、名前が :const:`~CALLINSHOW` になる。
Returns:
bool: 処理成功時は `True`。
"""
# ユーザー名取得
if krsw is not None:
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
self._name: str = self.CALLINSHOW
else:
name_optional: str | None = self._input_name(accessor)
if name_optional is not None:
self._name: str = name_optional
else:
return False
# 探索対象のうち最古のツイートID取得
oldest_id: str = self._input_oldest_id()
self._oldest_id_queue: deque[int] = deque(map(int, oldest_id)) if \
len(oldest_id) > 0 else deque([0])
self._latest_id_digit: int = self._oldest_id_queue.popleft()
self._oldest_id_queue.append(0) # 空になると予想外の動作を起こしかねないため
self._oldest_url: str = self.TWITTER_URL + self._name + '/status/' \
+ oldest_id
logger.info(
'ユーザー名: @' + self._name
+ ', 最古のURL: ' + self._oldest_url + '*' + 'で検索しまふ'
)
self._twitter_url_pattern: re.Pattern[str] = re.compile(
'^' + self.TWITTER_URL + self._name + r'/status/\d+')
self._archive_rt_pattern: re.Pattern[str] = re.compile(
r'on (?:Twitter|X): "RT @\w+:.+"(?:$| / Twitter$| / X$)')
self._url_list_on_wiki: list[str] = []
self._url_list: list[UrlTuple] = []
return True
def _input_oldest_id(self) -> str:
"""探索対象のうち、最古のツイートのIDを入力させる。
正しい形式のツイートIDまたは空白が入力されるまで繰り返す。
Returns:
str: ツイートIDまたは空文字列。
"""
while True:
print('ツイートを新しい順に取得するので、収集をストップするツイートIDを入力して下さいナリ')
print('空白だと全ツイートを収集しますを')
print('ツイートIDとは、`' + self.TWITTER_URL + self.CALLINSHOW
+ '/status/`の後ろの数字でふ')
print('> ', end='')
oldest_id: str = input()
if re.match(r'^\d*$', oldest_id) is not None:
return oldest_id
else:
print('整数か空白を入力して下さいナリ')
def _get_tweet_urls_from_wiki(self, accessor: AccessorHandler) -> None:
"""Wikiに未掲載のツイートのURLリストを取得する。
Args:
accessor (AccessorHandler): アクセスハンドラ。
"""
def assert_get(tag: Tag, key: str) -> str:
"""BeautifulSoupのタグから属性値を取得する。
Args:
tag (Tag): BeautifulSoupのタグ。
key (str): 属性キー。
Returns:
str: タグの属性値。
"""
result: Final[str | list[str] | None] = tag.get(key)
assert isinstance(result, str)
return result
template_page: Final[str | None] = (
accessor.request_with_requests_module(self.TEMPLATE_URL))
assert template_page is not None
template_soup: Final[BeautifulSoup] = BeautifulSoup(
template_page,
'html.parser')
urls: Final[list[str]] = list(map(
lambda x: self.WIKI_URL + assert_get(x, 'href'),
template_soup.select('.wikitable > tbody > tr > td a')))
for url in urls:
logger.info(f'{unquote(url)} で収集済みツイートを探索中でふ')
page: str | None = accessor.request_with_requests_module(url)
assert page is not None
soup: BeautifulSoup = BeautifulSoup(page, 'html.parser')
url_as = soup.select('tr > th a')
assert len(url_as) > 0
for url_a in url_as:
href: str | list[str] | None = url_a.get('href')
assert isinstance(href, str)
if href.startswith(self.TWITTER_URL + self._name):
self._url_list_on_wiki.append(href)
def _append_tweet_urls(self, soup: Tag) -> None:
"""ツイートのURLを保存する。
リツイートはここで除外する。
Args:
soup (Tag): archive.todayでのURL検索結果のページのオブジェクト。
"""
tweets: Final[ResultSet[Tag]] = soup.select(
'#CONTENT > div > .TEXT-BLOCK')
for tweet in tweets:
# リダイレクトがあればすべてのリンクを、ないなら目的のURLだけが取得できる
urls: list[str] = list(map(
lambda a: a.text, tweet.select('a')[1:]))
# 別のユーザへのリダイレクトがあるものは除く
if len(set(map(lambda url: urlparse(url).path, urls))) != 1:
logger.debug('Found an external redirect ' + str(urls))
continue
url_matched: Final[re.Match[str]] | None = next(
filter(
lambda x: x is not None,
map(lambda url: self._twitter_url_pattern.match(url), urls)
), None
) # 最初にマッチしたURLを返す
if url_matched is not None:
if url_matched.string < self._oldest_url:
logger.debug(url_matched.string + 'は最古の探索対象よりも古いのでポア')
continue
a_first_child: Tag | None = tweet.select_one('a:first-child')
assert a_first_child is not None
archive_url: str | list[str] | None = a_first_child.get('href')
assert isinstance(archive_url, str)
if url_matched[0] not in self._url_list_on_wiki:
# ツイートのURLが未取得のものならばURLを保存する
text_tag: Tag | None = tweet.select_one('a')
assert text_tag is not None
if (self._archive_rt_pattern.search(text_tag.text) is not
None):
logger.debug(url_matched[0] + 'はリツイートなのでポア')
continue
self._url_list.append(
UrlTuple(url_matched[0], archive_url))
self._url_list_on_wiki.append(url_matched[0])
self._url_list.sort(reverse=True, key=lambda x: x.url) # 降順
def _fetch_next_page(
self, soup: Tag, accessor: AccessorHandler
) -> str | None:
"""archive.todayの検索結果のページをpaginateする。
Args:
soup (Tag): archive.todayでのURL検索結果のページのオブジェクト。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
str | None: 次のページがあればそのHTML。
"""
next_a: Final[Tag | None] = soup.select_one('#next')
if next_a is not None:
link: Final[str | list[str] | None] = next_a.get('href')
assert isinstance(link, str)
page: Final[str | None] = accessor.request(link)
assert page is not None
return page
else:
return
def _get_tweet_loop(self, soup: Tag, accessor: AccessorHandler) -> None:
"""archive.todayの検索結果に対して、paginateしながら未記載のツイートURLを記録する。
Args:
soup (Tag): archive.todayでのURL検索結果のページのオブジェクト。
accessor (AccessorHandler): アクセスハンドラ。
"""
has_next: bool = True
while has_next:
self._append_tweet_urls(soup)
next_page: str | None = self._fetch_next_page(soup, accessor)
if next_page is not None:
soup = BeautifulSoup(next_page, 'html.parser')
else:
has_next = False
def _next_url(
self,
accessor: AccessorHandler,
tweet_url_prefix: str,
incremented_num: int,
incremented: bool = False
) -> None:
"""ツイートのURLを、数字部分をインクリメントしながら探索する。
`https://twitter.com/CallinShow/status/` に続く数字部分について、
`tweet_url_prefix` で始まるものを、その次の桁を `incremented_num` から9までインクリメントして探索する。
Args:
accessor (AccessorHandler): アクセスハンドラ。
tweet_url_prefix (str): ツイートURLの数字部分のうち、インクリメントする桁以前の部分。
incremented_num (int): ツイートURLのうちインクリメントする桁の現在の数字。
incremented (bool, optional): `incremented_num` がインクリメント済みの場合True。
Examples:
`https://twitter.com/CallinShow/status/1707` で始まるURLから最新まですべて探索する場合
::
self._next_url(accessor, '1707', 0)
`https://twitter.com/CallinShow/status/165` で始まるURLから最新まですべて探索する場合
::
self._next_url(accessor, '16', 5)
"""
assert 0 <= incremented_num <= 9, \
f'incremented_numが{incremented_num}でふ'
logger.info(self.TWITTER_URL + self._name + '/status/'
+ tweet_url_prefix + str(incremented_num) + '*を探索中')
page: Final[str | None] = accessor.request(
self.ARCHIVE_TODAY
+ self.TWITTER_URL
+ self._name
+ '/status/'
+ tweet_url_prefix
+ str(incremented_num) + '*')
assert page is not None
soup: Final[BeautifulSoup] = BeautifulSoup(page, 'html.parser')
pager: Final[Tag | None] = soup.select_one('#pager')
if pager is not None: # 検索結果が複数ページ
page_num_matched: Final[re.Match[str] | None] = re.search(
r'of (\d+) urls', pager.text)
assert page_num_matched is not None
page_num: Final[int] = int(page_num_matched[1])
if page_num > 100: # ツイート数が100を超えると途中でreCAPTCHAが入るので、もっと細かく検索
# ユーザが最初に"1512"と指定した時、tweet_url_prefix="151"ならincremented_numを2から開始したいが、
# tweet_url_prefix="152"や"160"などならincremented_numを0から開始したい
if incremented:
self._next_url(accessor,
tweet_url_prefix + str(incremented_num), 0,
True)
else:
self._latest_id_digit = self._oldest_id_queue.popleft()
self._oldest_id_queue.append(0) # 空になると予想外の動作を起こしかねないため
self._next_url(accessor,
tweet_url_prefix + str(incremented_num),
self._latest_id_digit)
else:
logger.debug(
self.TWITTER_URL + self._name + '/status/'
+ tweet_url_prefix + str(incremented_num) + '*からURLを収集しまふ')
self._get_tweet_loop(soup, accessor)
else: # 検索結果が1ページだけ
if soup.select_one('.TEXT-BLOCK'): # 検索結果が存在する場合
logger.debug(
self.TWITTER_URL + self._name + '/status/'
+ tweet_url_prefix + str(incremented_num) + '*からURLを収集しまふ')
self._get_tweet_loop(soup, accessor)
# 次のurlを探索
if incremented_num == 9:
return
else:
self._next_url(accessor,
tweet_url_prefix,
incremented_num + 1,
True)
def _parse_images(
self, soup: Tag, accessor: AccessorHandler
) -> tuple[str, ...]:
"""ツイートの魚拓から画像をダウンロードし、ファイル名のタプルを返す。
引用リツイート内の画像や、外部リンクの画像は除く。
Args:
soup (Tag): ツイートの魚拓のタグ。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
tuple[str, ...]: ファイル名のタプル。
"""
image_list: list[str] = []
image_tags: ResultSet[Tag] = soup.select(
'img[alt="Image"]:not(div[role="link"] img[alt="Image"])')
for image_tag in image_tags:
media_url_path: str | list[str] | None = (
image_tag.get('src'))
assert isinstance(media_url_path, str)
original_image_url: str | list[str] | None = (
image_tag.get('new-cursrc'))
assert isinstance(original_image_url, str)
parse_result: ParseResult = urlparse(original_image_url)
original_image_name: str = parse_result.path.split('/')[-1]
original_extension: str = parse_qs(parse_result.query)['format'][0]
original_image_name += '.' + original_extension
self._download_media(
urljoin(self.ARCHIVE_TODAY, media_url_path),
original_image_name,
accessor)
image_list.append(original_image_name)
return tuple(image_list)
def _replace_links(self, tag: Tag, accessor: AccessorHandler) -> Tag:
"""ツイートの魚拓の、本文内のaタグを整形する。
Args:
tag (Tag): aタグを置き換えるべきタグ。
accessor (AccessorHandler): アクセスハンドラ。
Returns:
Tag: テキスト内のaタグが置き換えられたタグ。
"""
internal_a_tags: Final[ResultSet[Tag]] = tag.select(
'div[data-testid="tweetText"]:not(div[role="link"] '
f'div[data-testid="tweetText"]) a[href*="{self.TWITTER_URL}"]')
# Twitter内部のリンク
if len(internal_a_tags) > 0:
for a_tag in internal_a_tags:
account_name: str = a_tag.text
if account_name.startswith('#'):
a_tag.replace_with(a_tag.text)
else:
url: str = urljoin(self.TWITTER_URL, account_name[1:])
a_tag.replace_with(TableBuilder.archive_url(
url,
self._archive(url, accessor),
account_name))
# 通常のリンク
a_tags: Final[ResultSet[Tag]] = tag.select(
'div[dir="auto"] > a:not('
'div[role="link"] div[dir="auto"] > a)')
for a_tag in a_tags:
a_tag.replace_with(
'{{Archive|1=リンクがあります。重複に注意して下さい|2=リンクがあります}}')
return tag
@override
def _get_tweet_poll(self, tweet: Tag) -> str:
card_poll: Tag | None = tweet.select_one('div[data-testid="cardPoll"]')
poll_txt: str = ''
if card_poll is not None:
polls: list[tuple[str, str]] = []
max_ratio: float = 0.
leader_idx: int = 0
tweet_lis = card_poll.select('li')
for i, li in enumerate(tweet_lis):
poll_data: ResultSet[Tag] = li.select('div > span')
text_tag = poll_data[0]
ratio_tag = poll_data[1]
ratio: str = ratio_tag.text
float_ratio: float = float(ratio[0:-1])
if max_ratio < float_ratio:
leader_idx = i
max_ratio = float_ratio
polls.append((text_tag.text, ratio))
for i, poll_result in enumerate(polls):
if i == leader_idx:
poll_txt += (
'<br>\n'
' <span style="display: inline-block; '
'width: 30em; background: linear-gradient('
'to right, '
f'rgba(29, 155, 240, 0.58) 0 {poll_result[1]}, '
f'transparent {poll_result[1]} 100%); '
'font-weight: bold;">'
) + poll_result[1] + ' ' + poll_result[0] + '</span>'
else:
poll_txt += (
'<br>\n'
' <span style="display: inline-block; '
'width: 30em; background: linear-gradient('
'to right, '
f'rgb(207, 217, 222) 0 {poll_result[1]}, '
f'transparent {poll_result[1]} 100%);">'
) + poll_result[1] + ' ' + poll_result[0] + '</span>'
votes_count_tag: Tag | None = card_poll.select_one(
'div[data-testid="cardPoll"] > div')
assert votes_count_tag is not None
poll_txt += '<br>\n <span style="font-size: small;">' \
+ votes_count_tag.text + '</span>'
return poll_txt
@staticmethod
def _retrieve_emojis(tag: Tag) -> Tag:
"""絵文字を画像タグからUnicodeに戻す。
Args:
tag (Tag): 絵文字の含まれる可能性のあるタグ。
Returns:
Tag: 絵文字をUnicodeに戻したタグ。
"""
img_tags: Final[ResultSet[Tag]] = tag.select(
'img[new-cursrc^="https://abs-0.twimg.com/emoji"]')
for img_tag in img_tags:
alt_text: str | list[str] | None = img_tag.get('alt')
assert isinstance(alt_text, str)
img_tag.replace_with(alt_text)
return tag
@staticmethod
def _concat_texts(*texts: str) -> str:
"""2つ以上のテキストを改行タグを挟んで結合する。
Args:
*texts: 入力テキスト。
Returns:
str: 結合されたテキスト。
"""
return '\n'.join(filter(None, texts))
def _get_tweet_from_archive(
self,
url_pairs: list[UrlTuple],
accessor: AccessorHandler
) -> None:
"""魚拓からツイート本文を取得する。
Args:
url_pairs (list[UrlTuple]): URLとその魚拓URLのペアのリスト。
accessor (AccessorHandler): アクセスハンドラ。
"""
table_builder: Final[TableBuilder] = TableBuilder()
for url_pair in url_pairs:
page: str | None = accessor.request(url_pair.archive_url)
assert page is not None
article: Tag | None = BeautifulSoup(
page, 'html.parser'
).select_one('article[tabindex="-1"]')
# リツイート以外のURLを保存する
if article is not None and article.select_one(
'span[data-testid="socialContext"]') is None:
logger.debug(url_pair.url + 'を整形しますを')
tweet_date: datetime = self._tweet_date(url_pair.url)
table_builder.next_day_if_necessary(tweet_date)
tweet_callinshow_template: str = (
TableBuilder.callinshowlink_url(
url_pair.url, url_pair.archive_url.replace(
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)))
# 本文
try:
text_tag: Tag | None = article.select_one(
'div[dir="auto"]')
if text_tag is not None:
text_tag = self._retrieve_emojis(text_tag)
text: str = self._replace_links(text_tag,
accessor).text
else:
text: str = ''
# YouTube等のリンク
card_tag: Tag | None = article.select_one(
'div[aria-label="Play"]:not(div[role="link"] '
'div[aria-label="Play"])')
if card_tag is not None:
text = self._concat_texts(
text,
'{{Archive|1=リンクがあります。重複に注意して下さい|2=リンクがあります}}')
# 画像に埋め込まれた外部サイトへのリンク
article_tag: Tag | None = article.select_one(
'a[role="link"][aria-label] img:not(div[role="link"] '
'a[role="link"][aria-label] img)')
if article_tag is not None:
text = self._concat_texts(
text,
'{{Archive|1=リンクがあります。重複に注意して下さい|2=リンクがあります}}')
# 引用の有無のチェック
retweet_tag: Tag | None = article.select_one(
'div[role="link"]')
if retweet_tag is not None:
account_name_tag: Tag | None = (
retweet_tag.select_one(
'div[tabindex="-1"] > div > span:not(:has(> *))')) # noqa: E501
assert account_name_tag is not None
text = self._concat_texts(
text, '{{Archive|1='
+ account_name_tag.text
+ 'のリツイートがあります|2=リツイートがあります}}')
# 画像の取得
image_list: tuple[str, ...] = self._parse_images(
article, accessor)
image_txt: str = ' '.join(map(
lambda t: f'[[ファイル:{t}|240px]]', image_list))
text = self._concat_texts(text, image_txt)
text = TableBuilder.escape_wiki_reserved_words(text)
# 投票の処理
poll_txt: str = self._get_tweet_poll(article)
text = self._concat_texts(text, poll_txt)
# バージョンの処理
possible_version_text_tags: ResultSet[Tag] = (
article.select(
'div > span > '
'span:not(:has(> *)):not(div[role="link"] div > '
'span > span:not(:has(> *)))'))
if len(list(filter(
lambda x: x.text
== 'There’s a new version of this post.',
possible_version_text_tags))) > 0:
tweet_callinshow_template += '([[新しいバージョンがあります|後の版→]])'
except Exception as e:
# エラーが起きても止めない
logger.exception('エラーが発生してツイートが取得できませんでしたを', exc_info=True)
text = 'エラーが発生してツイートが取得できませんでした\n' + ''.join(
TracebackException.from_exception(e).format())
table_builder.append(tweet_callinshow_template, text)
else:
logger.warning(url_pair.url + 'はリツイートなので飛ばすナリ。'
'URLリスト収集の時点でフィルタできなかった、これはいけない。')
table_builder.dump_file()
@override
def execute(
self,
krsw: str | None = None,
use_browser: bool = True,
enable_javascript: bool = True
) -> None:
"""通信が必要な部分のロジック。
Args:
krsw (str | None, optional): `None` でない場合、名前が自動で \
:const:`~CALLINSHOW` になる。
use_browser (bool, optional): `True` ならSeleniumを利用する。\
`False` ならRequestsのみでアクセスする。
enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は \
`True`。
"""
logger.info('Wikiに未掲載のツイートのURLを収集しますを')
warnings.warn(
'\033[31m'
'このモードではURLリストの他にツイート本文も整形して取得するナリが、'
'機能のテストが不十分であることと、TwitterのHTML構造は'
'しばしば変更されることから、整形後の出力が正しいとは限らないナリ。'
'機能を過信せず、自分の目で確かめてね(笑)、それはできるよね。'
'\033[0m')
# Seleniumドライバーを必ず終了するため、with文を利用する。
with AccessorHandler(use_browser, enable_javascript) as accessor:
# 実行前のチェック
self._check_archive_instance(accessor)
# 検索クエリの設定
if not self._set_queries(accessor, krsw):
sys.exit(1)
# Wikiに既に掲載されているツイートのURLを取得
self._get_tweet_urls_from_wiki(accessor) # Wikiに接続できない時はここをコメントアウト
# 未掲載のツイートのURLを取得する
self._next_url(accessor, '', self._latest_id_digit)
logger.debug(f'{len(self._url_list)} tweets are missed')
# URL一覧ファイルのダンプ
with Path(self.URL_LIST_FILENAME).open('w', encoding='utf-8') as f:
for url_pair in self._url_list:
f.write(url_pair.url + '\n')
logger.info('URL一覧手に入ったやで〜')
# ツイート本文を取得する
signal.signal(signal.SIGINT, self._signal_handler)
try:
self._get_tweet_from_archive(self._url_list, accessor)
except Exception:
# エラーが起きたらURLのリストをそのまま返す
logger.exception(
'異常が起きたので終了するナリ。'
+ self.URL_LIST_FILENAME + 'を元に自分でツイートを整形してほしいナリ')
if __name__ == '__main__':
if sys.version_info < (3, 12):
logger.critical('貴職のPythonのバージョン: ' + str(sys.version_info))
sys.exit('Pythonのバージョンを3.12以上に上げて下さい')
parser: Final[ArgumentParser] = ArgumentParser()
parser.add_argument(
'--krsw',
type=str,
nargs='?',
const='',
default=None,
help=('指定すると、パカデブのツイートを取得上限数まで取得する。'
'更に--search-unarchivedモードでこのオプションに引数を与えると、'
'その文言が終わりにするツイートになる。'))
parser.add_argument(
'-n',
'--no-browser',
action='store_true',
help='指定すると、Tor Browserを利用しない。')
parser.add_argument(
'-d',
'--disable-script',
action='store_true',
help='指定すると、Tor BrowserでJavaScriptを利用しない。')
parser.add_argument(
'-u',
'--search-unarchived',
action='store_true',
help=('指定すると、従来のNitterからツイートを収集するモードになる。廃止予定。'))
args: Final[Namespace] = parser.parse_args()
logger.debug('args: ' + str(args))
twitter_archiver: Final[TwitterArchiver] = (
TwitterArchiver() if args.search_unarchived else ArchiveCrawler()
)
twitter_archiver.execute(
args.krsw,
not args.no_browser,
not args.disable_script)
実行例
2月9日
https://twitter.com/CallinShow/status/1755640113332449372(魚拓)(後の版→) |
---|
リフォーム屋には要注意。 |
https://twitter.com/CallinShow/status/1755640350868467967(魚拓) |
怪しいリフォーム屋には要注意。 |
https://twitter.com/CallinShow/status/1755725155291246635(魚拓) |
メタは胡散臭い詐欺広告で稼いでいるんだから、何言ってるんだって話だろ。 |
2月10日
https://twitter.com/CallinShow/status/1756265888423051340(魚拓) |
---|
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
2月12日
https://twitter.com/CallinShow/status/1756939496283836878(魚拓) |
---|
これは必読の書だな。 |
https://twitter.com/CallinShow/status/1756992905036927027(魚拓) |
https://twitter.com/CallinShow/status/1756995624837476473(魚拓) |
今日月食? |
https://twitter.com/CallinShow/status/1757048622028657125(魚拓) |
Who is the person who is impersonating me and making criminal threats in the Philippines. Dear Friipinos, this criminal is a person who does not have a job in Japan and just stays at home. We believe that he is not capable of actually committing a crime. I hope that the Philippine and Japanese governments will cooperate to arrest the criminal and that he will be severely punished. |
2月13日
https://twitter.com/CallinShow/status/1757370912171814925(魚拓) |
---|
菊地翔は詐欺師だ。 |
2月14日
https://twitter.com/CallinShow/status/1757763709827776850(魚拓) |
---|
今日は一日布団の中にいた。 |
https://twitter.com/CallinShow/status/1757765745986203848(魚拓) |
2月14日に意味なんかはない。 |
https://twitter.com/CallinShow/status/1757768530165125262(魚拓) |
さあ、自分で買っておいたピエールマルコリーニでも食べるか。 |
2月16日
https://twitter.com/CallinShow/status/1758151064858358179(魚拓) |
---|
ピエールマルコリーニだと思ったら、キットカットを食べていた。 |
https://twitter.com/CallinShow/status/1758155326850048038(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758155650230812748(魚拓) |
高田馬場のだんご虫 |
https://twitter.com/CallinShow/status/1758164160402395259(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758166055506358335(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758166909412081993(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758167792891937190(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758171644496134615(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
2月17日
https://twitter.com/CallinShow/status/1758537053602816220(魚拓) |
---|
オレの投稿に捨て垢で張り付く馬鹿はウケるな。 |
https://twitter.com/CallinShow/status/1758540840874778691(魚拓) |
やりますよ |
https://twitter.com/CallinShow/status/1758542361486094374(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758557889927737554(魚拓) |
逮捕、送検情報を、テレビが報じるときって、警察が広報すると決めた案件なんだ。 |
https://twitter.com/CallinShow/status/1758569972891316469(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758570299287843130(魚拓) |
マ・ドンソク主演 |
https://twitter.com/CallinShow/status/1758692104111542606(魚拓) |
https://twitter.com/CallinShow/status/1758702560230396318(魚拓) |
はい! |
https://twitter.com/CallinShow/status/1758721953941291263(魚拓) |
どの株買ったって自分で口外して煽っているのって、馬鹿引っ掛ける手口だよな。 |
https://twitter.com/CallinShow/status/1758723779872870712(魚拓) |
[リンクがあります。重複に注意して下さい リンクがあります。重複に注意して下さい]([リンクがあります 魚拓]) |
https://twitter.com/CallinShow/status/1758724192646930697(魚拓) |
弁護士になっただけで逆転できる人生はない。 |
https://twitter.com/CallinShow/status/1758724799046819840(魚拓) |
昔こうだったから今◯◯になった、はい、立志伝 |
https://twitter.com/CallinShow/status/1758814041370431746(魚拓) |
明日はフェブラリーステークス |