→コード: v4.1.0 wikiに掲載されていないツイートのURLをarchive.isから取得する機能を追加
>Fet-Fe (→コード: v4.0.5 動画を取得できるnitterインスタンスに修正) |
>Fet-Fe (→コード: v4.1.0 wikiに掲載されていないツイートのURLをarchive.isから取得する機能を追加) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.0 | ver4.1.0 2023/10/9恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
25行目: | 25行目: | ||
``--no_browser`` オプションでTor Browserを使用しないモードに、 | ``--no_browser`` オプションでTor Browserを使用しないモードに、 | ||
``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | ||
``--search_unarchived`` オプションでは、`archive.today <https://archive.today>`_ から | |||
Wikiに未掲載のツイートのURLを収集するモードになります。 | |||
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | 自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | ||
30行目: | 33行目: | ||
Note: | Note: | ||
* Pythonのバージョンは3. | * Pythonのバージョンは3.12以上 | ||
* 環境は玉葱前提です。 | * 環境は玉葱前提です。 | ||
68行目: | 71行目: | ||
from time import sleep | from time import sleep | ||
from types import MappingProxyType, TracebackType | from types import MappingProxyType, TracebackType | ||
from typing import Any, Final, NoReturn, Self | from typing import Any, Final, NamedTuple, NoReturn, Self, override | ||
from urllib.parse import quote, unquote, urljoin | from urllib.parse import quote, unquote, urljoin | ||
from zoneinfo import ZoneInfo | from zoneinfo import ZoneInfo | ||
83行目: | 86行目: | ||
from selenium.webdriver.support.wait import WebDriverWait | from selenium.webdriver.support.wait import WebDriverWait | ||
logging.basicConfig( | logging.basicConfig(format='{asctime} [{levelname:.4}] : {message}', style='{') | ||
logger: Final[Logger] = getLogger(__name__) | |||
logger: Logger = getLogger(__name__) | logger.setLevel(logging.INFO) # basicConfigで設定するとモジュールのDEBUGログなども出力される | ||
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | # おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | ||
163行目: | 166行目: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
self._proxies: dict[str, str] | None = | self._proxies: Final[dict[str, str] | None] = ( | ||
self._choose_tor_proxies() | |||
) # Torに必要なプロキシをセット | |||
def _execute(self, | def _execute(self, | ||
185行目: | 189行目: | ||
sleep(self.WAIT_TIME) # DoS対策で待つ | sleep(self.WAIT_TIME) # DoS対策で待つ | ||
try: | try: | ||
res: requests.models.Response = requests.get( | res: Final[requests.models.Response] = requests.get( | ||
url, | url, | ||
timeout=self.REQUEST_TIMEOUT, | timeout=self.REQUEST_TIMEOUT, | ||
headers=self.HEADERS, | headers=self.HEADERS, | ||
allow_redirects=False, | allow_redirects=False, | ||
proxies=proxies if proxies is not None else self | proxies=proxies if proxies is not None | ||
else getattr(self, '_proxies', None)) | |||
res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生 | res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生 | ||
return res | |||
except requests.exceptions.ConnectionError: | except requests.exceptions.ConnectionError: | ||
raise | raise | ||
197行目: | 203行目: | ||
# requestsモジュール固有の例外を共通の例外に変換 | # requestsモジュール固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from e | raise AccessError(str(e)) from e | ||
def get(self, | def get(self, | ||
232行目: | 237行目: | ||
Returns: | Returns: | ||
bytes | None: | bytes | None: 画像のバイナリ。画像でなければ `None`。 | ||
""" | """ | ||
try: | try: | ||
res: requests.models.Response = self._execute(url, self._proxies) | res: Final[requests.models.Response] = self._execute(url, | ||
self._proxies) | |||
except (requests.exceptions.ConnectionError, AccessError): | except (requests.exceptions.ConnectionError, AccessError): | ||
raise | raise | ||
247行目: | 253行目: | ||
"""Torを使うのに必要なプロキシ情報を返す。 | """Torを使うのに必要なプロキシ情報を返す。 | ||
プロキシなしで接続できれば `None`、 | |||
Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、 | Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、 | ||
torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。 | torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。 | ||
307行目: | 313行目: | ||
TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | ||
'Windows': 'C: | 'Windows': r'C:\Program Files\Tor Browser\Browser\firefox.exe', | ||
'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox', | 'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox', | ||
'Linux': '/usr/bin/torbrowser' | 'Linux': '/usr/bin/torbrowser' | ||
318行目: | 324行目: | ||
""" | """ | ||
WAIT_TIME_FOR_RECAPTCHA: Final[int] = | WAIT_TIME_FOR_RECAPTCHA: Final[int] = 10_000 | ||
"""Final[int]: reCAPTCHAのための待機時間。 | """Final[int]: reCAPTCHAのための待機時間。 | ||
""" | """ | ||
330行目: | 336行目: | ||
enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | ||
""" | """ | ||
self._javascript_enabled: bool = enable_javascript | self._javascript_enabled: Final[bool] = enable_javascript | ||
options: FirefoxOptions = FirefoxOptions() | options: Final[FirefoxOptions] = FirefoxOptions() | ||
options.binary_location = self.TOR_BROWSER_PATHS[platform.system()] | options.binary_location = self.TOR_BROWSER_PATHS[platform.system()] | ||
347行目: | 353行目: | ||
try: | try: | ||
self._driver: webdriver.Firefox = webdriver.Firefox( | self._driver: Final[webdriver.Firefox] = webdriver.Firefox( | ||
options=options) | options=options) | ||
sleep(1) | sleep(1) | ||
wait_init: WebDriverWait = WebDriverWait(self._driver, | wait_init: Final[WebDriverWait] = WebDriverWait( | ||
self._driver, | |||
self.WAIT_TIME_FOR_INIT) | |||
wait_init.until( # type: ignore | wait_init.until( # type: ignore | ||
ec.element_to_be_clickable((By.ID, 'connectButton')) | ec.element_to_be_clickable((By.ID, 'connectButton')) | ||
368行目: | 375行目: | ||
self._driver.quit() | self._driver.quit() | ||
def _check_recaptcha(self) -> None: | def _check_recaptcha(self, url: str) -> None: | ||
"""reCAPTCHAが表示されているかどうか判定して、入力を待機する。 | """reCAPTCHAが表示されているかどうか判定して、入力を待機する。 | ||
Args: | |||
url (str): アクセスしようとしているURL。 | |||
reCAPTCHAが要求されると `current_url` が変わることがあるので必要。 | |||
Raises: | Raises: | ||
378行目: | 389行目: | ||
""" | """ | ||
try: | try: | ||
self._driver.find_element( | self._driver.find_element(By.ID, 'g-recaptcha') # 要素がない時に例外を吐く | ||
if self._javascript_enabled: | if self._javascript_enabled: | ||
logger.warning('reCAPTCHAを解いてね(笑)、それはできるよね。') | logger.warning(f'{url} でreCAPTCHAが要求されたナリ') | ||
print('reCAPTCHAを解いてね(笑)、それはできるよね。') | |||
print('botバレしたらNew Tor circuit for this siteを選択するナリよ') | |||
WebDriverWait( | WebDriverWait( | ||
self._driver, | self._driver, | ||
self.WAIT_TIME_FOR_RECAPTCHA).until( # type: ignore | self.WAIT_TIME_FOR_RECAPTCHA).until( # type: ignore | ||
ec.staleness_of( | ec.staleness_of( | ||
self._driver.find_element( | self._driver.find_element(By.ID, 'g-recaptcha'))) | ||
sleep(self.WAIT_TIME) # DoS対策で待つ | sleep(self.WAIT_TIME) # DoS対策で待つ | ||
else: | else: | ||
raise ReCaptchaRequiredError( | raise ReCaptchaRequiredError( | ||
'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: ' | f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}') | ||
except NoSuchElementException: | except NoSuchElementException: | ||
# reCAPTCHAの要素がなければそのまま | # reCAPTCHAの要素がなければそのまま | ||
pass | pass | ||
self._random_sleep() | |||
self._driver.get(url) | |||
def get(self, url: str) -> str: | def get(self, url: str) -> str: | ||
415行目: | 424行目: | ||
try: | try: | ||
self._driver.get(url) | self._driver.get(url) | ||
self._check_recaptcha() | self._check_recaptcha(url) | ||
except WebDriverException as e: | except WebDriverException as e: | ||
# Selenium固有の例外を共通の例外に変換 | # Selenium固有の例外を共通の例外に変換 | ||
442行目: | 451行目: | ||
Args: | Args: | ||
use_browser (bool): | use_browser (bool): `True` ならSeleniumを利用する。 | ||
enable_javascript (bool): | `False` ならRequestsのみでアクセスする。 | ||
enable_javascript (bool): SeleniumでJavaScriptを利用する場合は `True`。 | |||
""" | """ | ||
self._selenium_accessor: SeleniumAccessor | None = | self._selenium_accessor: Final[SeleniumAccessor | None] = ( | ||
enable_javascript) if use_browser else None | SeleniumAccessor(enable_javascript) if use_browser else None | ||
self._requests_accessor: RequestsAccessor = RequestsAccessor() | ) | ||
self._requests_accessor: Final[RequestsAccessor] = RequestsAccessor() | |||
def __enter__(self) -> Self: | def __enter__(self) -> Self: | ||
""" | """`with` ブロックの開始時に実行する。 | ||
Returns: | Returns: | ||
461行目: | 472行目: | ||
exc_value: BaseException | None, | exc_value: BaseException | None, | ||
traceback: TracebackType | None) -> None: | traceback: TracebackType | None) -> None: | ||
""" | """`with` ブロックの終了時に実行する。 | ||
Args: | Args: | ||
494行目: | 505行目: | ||
raise | raise | ||
def _request_with_callable(self, | def _request_with_callable( | ||
self, | |||
url: str, | |||
request_callable: Callable[[str], Any]) -> Any | None: | |||
""" | """`request_callable` の実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
成功すると結果を返す。 | 成功すると結果を返す。 | ||
接続失敗が何度も起きると `None` を返す。 | |||
Args: | Args: | ||
508行目: | 519行目: | ||
Returns: | Returns: | ||
Any | None: | Any | None: レスポンス。接続失敗が何度も起きると `None` を返す。 | ||
""" | """ | ||
logger.debug('Requesting ' + unquote(url)) | |||
for i in range(1, self.LIMIT_N_REQUESTS + 1): | for i in range(1, self.LIMIT_N_REQUESTS + 1): | ||
try: | try: | ||
res: Any = request_callable(url) | res: Final[Any] = request_callable(url) | ||
except AccessError: | except AccessError: | ||
logger.warning( | logger.warning( | ||
528行目: | 540行目: | ||
成功すると結果を返す。 | 成功すると結果を返す。 | ||
接続失敗が何度も起きると `None` を返す。 | |||
Args: | Args: | ||
url (str): | url (str): 接続するURL。 | ||
Returns: | Returns: | ||
str | None: | str | None: レスポンスのテキスト。接続失敗が何度も起きると `None` を返す。 | ||
Note: | Note: | ||
545行目: | 557行目: | ||
成功すると結果を返す。 | 成功すると結果を返す。 | ||
接続失敗が何度も起きると `None` を返す。 | |||
Args: | Args: | ||
url (str): | url (str): 接続するURL。 | ||
Returns: | Returns: | ||
str | None: | str | None: レスポンスのテキスト。接続失敗が何度も起きると `None` を返す。 | ||
Note: | Note: | ||
562行目: | 574行目: | ||
成功すると結果を返す。 | 成功すると結果を返す。 | ||
接続失敗が何度も起きると `None` を返す。 | |||
Args: | Args: | ||
url (str): | url (str): 接続するURL。 | ||
Returns: | Returns: | ||
bytes | None: | bytes | None: レスポンスのバイト列。接続失敗が何度も起きると `None` を返します。 | ||
Note: | Note: | ||
失敗かどうかは呼出側で要判定 | 失敗かどうかは呼出側で要判定 | ||
""" | """ | ||
return self._request_with_callable(url, | return self._request_with_callable( | ||
url, | |||
self._requests_accessor.get_image) | |||
@property | @property | ||
589行目: | 602行目: | ||
"""Wikiの表を組み立てるためのクラス。 | """Wikiの表を組み立てるためのクラス。 | ||
""" | """ | ||
FILENAME: Final[str] = 'tweet.txt' | |||
"""Final[str]: ツイートを保存するファイルの名前。 | |||
""" | |||
def __init__(self, date: datetime) -> None: | def __init__(self, date: datetime) -> None: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
608行目: | 626行目: | ||
return self._count | return self._count | ||
def append(self, | def append(self, callinshow_template: str, text: str) -> None: | ||
"""ツイートを表に追加する。 | """ツイートを表に追加する。 | ||
Args: | Args: | ||
callinshow_template (str): ツイートのURLをCallinshowLinkテンプレートに入れたもの。 | |||
text (str): ツイートの本文。 | text (str): ツイートの本文。 | ||
""" | """ | ||
self._tables[-1] = '!' + | self._tables[-1] = '!' + callinshow_template + '\n|-\n|\n' \ | ||
+ text \ | + text \ | ||
+ '\n|-\n' \ | + '\n|-\n' \ | ||
627行目: | 645行目: | ||
result_txt: Final[str] = '\n'.join(reversed(self._tables)) | result_txt: Final[str] = '\n'.join(reversed(self._tables)) | ||
with codecs.open( | with codecs.open(self.FILENAME, 'w', 'utf-8') as f: | ||
f.write(result_txt) | f.write(result_txt) | ||
logger.info('テキストファイル手に入ったやで〜') | logger.info('テキストファイル手に入ったやで〜') | ||
750行目: | 768行目: | ||
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}' | return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}' | ||
else: | else: | ||
return '{{Archive|1=' + | return '{{Archive|1=' + unquote(url) + '|2=' + archived_url \ | ||
+ '|3=' + text + '}}' | |||
@staticmethod | @staticmethod | ||
827行目: | 845行目: | ||
TWITTER_MEDIA_URL: Final[str] = 'https://pbs.twimg.com/media/' | TWITTER_MEDIA_URL: Final[str] = 'https://pbs.twimg.com/media/' | ||
"""Final[str]: TwitterのメディアのURL。 | """Final[str]: TwitterのメディアのURL。 | ||
""" | |||
INVIDIOUS_INSTANCES_URL: Final[str] = 'https://api.invidious.io/instances.json' | |||
"""Final[str]: Invidiousのインスタンスのリストを取得するAPIのURL。 | |||
""" | |||
INVIDIOUS_INSTANCES_TUPLE: tuple[str, ...] = ( | |||
'piped.kavin.rocks', | |||
'piped.video' | |||
) | |||
"""tuple[str, ...]: よく使われるInvidiousインスタンスのリスト。 | |||
:const:`~INVIDIOUS_INSTANCES_URL` にアクセスしてもインスタンスが取得できないことがあるため、 | |||
それによってURLが置換できないことを防ぐ。 | |||
""" | """ | ||
867行目: | 899行目: | ||
""" | """ | ||
def __init__(self) -> None: | def __init__(self) -> None | NoReturn: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
self._check_slash() # スラッシュが抜けてないかチェック | self._check_slash() # スラッシュが抜けてないかチェック | ||
self._has_ffmpeg: bool = self._check_ffmpeg() # ffmpegがあるかチェック | self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | ||
def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> | def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool: | ||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
883行目: | 915行目: | ||
krsw (bool): Trueの場合、名前が :const:`~CALLINSHOW` になり、 | krsw (bool): Trueの場合、名前が :const:`~CALLINSHOW` になり、 | ||
クエリと終わりにするツイートが無しになる。 | クエリと終わりにするツイートが無しになる。 | ||
Returns: | |||
bool: 処理成功時は `True`。 | |||
""" | """ | ||
888行目: | 923行目: | ||
if krsw: | if krsw: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | ||
self._name: str = self.CALLINSHOW | |||
else: | |||
name_optional: str | None = self._get_name(accessor) | |||
if name_optional is not None: | |||
self._name: str = name_optional | |||
else: | |||
return False | |||
# 検索クエリとページ取得 | # 検索クエリとページ取得 | ||
901行目: | 942行目: | ||
if page_optional is None: | if page_optional is None: | ||
self._on_fail() | self._on_fail() | ||
return False | |||
self._page: str = page_optional | self._page: str = page_optional | ||
907行目: | 949行目: | ||
logger.info('終わりにするツイートは自動的になしにナリます') | logger.info('終わりにするツイートは自動的になしにナリます') | ||
self._stop: str = '' if krsw else self._stop_word() | self._stop: str = '' if krsw else self._stop_word() | ||
logger.info( | |||
'ユーザー名: @' + self._name | |||
+ ', クエリ: ["' + '", "'.join(self._query_strs) | |||
+ '"], 終わりにする文言: "' + self._stop | |||
+ '"で検索しまふ' | |||
) | |||
# 日付取得 | # 日付取得 | ||
916行目: | 965行目: | ||
self._table_builder: TableBuilder = TableBuilder(date) | self._table_builder: TableBuilder = TableBuilder(date) | ||
return True | |||
def _check_slash(self) -> None | NoReturn: | def _check_slash(self) -> None | NoReturn: | ||
"""URLの最後にスラッシュが付いていなければエラーを出す。 | """URLの最後にスラッシュが付いていなければエラーを出す。 | ||
Returns: | Returns: | ||
None | NoReturn: | None | NoReturn: すべてのURLが正しければ `None`。失敗したら例外を出す。 | ||
Raises: | Raises: | ||
941行目: | 990行目: | ||
bool: ffmpegがインストールされているか。 | bool: ffmpegがインストールされているか。 | ||
""" | """ | ||
return subprocess.run(['which', 'ffmpeg'], | return subprocess.run( | ||
['which', 'ffmpeg'], | |||
stdout=subprocess.DEVNULL, | |||
stderr=subprocess.DEVNULL).returncode == 0 | |||
def _check_nitter_instance( | def _check_nitter_instance( | ||
954行目: | 1,004行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
Returns: | Returns: | ||
None | NoReturn: | None | NoReturn: Nitterにアクセスできれば `None`。できなければ終了。 | ||
""" | """ | ||
logger.info('Nitterのインスタンスチェック中ですを') | logger.info('Nitterのインスタンスチェック中ですを') | ||
973行目: | 1,023行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
Returns: | Returns: | ||
None | NoReturn: archive. | None | NoReturn: archive.todayのTorインスタンスにアクセスできれば `None`。できなければ終了。 | ||
""" | """ | ||
logger.info('archive.todayのTorインスタンスチェック中ですを') | logger.info('archive.todayのTorインスタンスチェック中ですを') | ||
993行目: | 1,043行目: | ||
Args: | Args: | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
Returns: | Returns: | ||
1,000行目: | 1,050行目: | ||
logger.info('Invidiousのインスタンスリストを取得中ですを') | logger.info('Invidiousのインスタンスリストを取得中ですを') | ||
invidious_json: Final[str | None] = ( | invidious_json: Final[str | None] = ( | ||
accessor.request_with_requests_module( | accessor.request_with_requests_module(self.INVIDIOUS_INSTANCES_URL) | ||
) | ) | ||
if invidious_json is None: | if invidious_json is None: | ||
1,011行目: | 1,060行目: | ||
# よく使われているものはチェック | # よく使われているものはチェック | ||
if | for invidious_api in self.INVIDIOUS_INSTANCES_TUPLE: | ||
if invidious_api not in instance_list: | |||
instance_list.append(invidious_api) | |||
logger.debug('Invidiousのインスタンス: [' + ', '.join(instance_list) + ']') | |||
return tuple(instance_list) | return tuple(instance_list) | ||
def _get_name(self, accessor: AccessorHandler) -> str | | def _get_name(self, accessor: AccessorHandler) -> str | None: | ||
"""ツイート収集するユーザー名を標準入力から取得する。 | """ツイート収集するユーザー名を標準入力から取得する。 | ||
1,026行目: | 1,075行目: | ||
Returns: | Returns: | ||
str | | str | None: ユーザ名。ユーザページの取得に失敗したら `None`。 | ||
""" | """ | ||
while True: | while True: | ||
print( | |||
'アカウント名を入れなければない。空白だと自動的に' | 'アカウント名を入れなければない。空白だと自動的に' | ||
+ self.CALLINSHOW | + self.CALLINSHOW | ||
+ 'になりますを') | + 'になりますを') | ||
account_str: str = input() | account_str: Final[str] = input() | ||
# 空欄で降臨ショー | # 空欄で降臨ショー | ||
if account_str == '': | if account_str == '': | ||
1,042行目: | 1,091行目: | ||
if res is None: # リクエスト失敗判定 | if res is None: # リクエスト失敗判定 | ||
self._on_fail() | self._on_fail() | ||
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | return None | ||
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | |||
if soup.title == self.NITTER_ERROR_TITLE: | if soup.title == self.NITTER_ERROR_TITLE: | ||
print(account_str + 'は実在の人物ではありませんでした') | |||
else: | else: | ||
logger.info(' | print('最終的に出会ったのが@' + account_str + 'だった。') | ||
logger.info('@' + account_str + 'をクロールしまふ') | |||
return account_str | return account_str | ||
1,052行目: | 1,103行目: | ||
"""検索クエリを標準入力から取得する。 | """検索クエリを標準入力から取得する。 | ||
""" | """ | ||
print('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。') | |||
print('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行') | |||
query_input: str = input() | query_input: str = input() | ||
# 空欄が押されるまでユーザー入力受付 | # 空欄が押されるまでユーザー入力受付 | ||
1,059行目: | 1,110行目: | ||
self._query_strs.append(query_input) | self._query_strs.append(query_input) | ||
query_input = input() | query_input = input() | ||
print('クエリのピースが埋まっていく。') | |||
def _on_fail(self) -> | def _on_fail(self) -> None: | ||
"""接続失敗時処理。 | """接続失敗時処理。 | ||
取得に成功した分だけファイルにダンプする。 | |||
""" | """ | ||
logger.critical('接続失敗しすぎで強制終了ナリ') | logger.critical('接続失敗時処理をしておりまふ') | ||
print('接続失敗しすぎで強制終了ナリ') | |||
if self._table_builder.count > 0: # 取得成功したデータがあれば発行 | if self._table_builder.count > 0: # 取得成功したデータがあれば発行 | ||
print('取得成功した分だけ発行しますを') | |||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
def _stop_word(self) -> str: | def _stop_word(self) -> str: | ||
1,078行目: | 1,129行目: | ||
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | ||
""" | """ | ||
print( | |||
'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)' | 'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)' | ||
f'ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。') | |||
end_str: Final[str] = input() | end_str: Final[str] = input() | ||
return end_str | return end_str | ||
1,086行目: | 1,137行目: | ||
def _download_media( | def _download_media( | ||
self, | self, | ||
media_url: str, | |||
media_name: str, | media_name: str, | ||
accessor: AccessorHandler) -> bool: | accessor: AccessorHandler) -> bool: | ||
1,091行目: | 1,143行目: | ||
Args: | Args: | ||
media_url (str): 画像のURL。 | |||
media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、 | media_name (str): 画像ファイル名。Nitter上のimgタグのsrc属性では、 | ||
``/pic/media%2F`` に後続する。 | ``/pic/media%2F`` に後続する。 | ||
1,099行目: | 1,152行目: | ||
""" | """ | ||
os.makedirs(self.MEDIA_DIR, exist_ok=True) | os.makedirs(self.MEDIA_DIR, exist_ok=True) | ||
image_bytes: Final[bytes | None] = accessor.request_image(media_url) | |||
image_bytes: Final[bytes | None] = accessor.request_image( | |||
if image_bytes is not None: | if image_bytes is not None: | ||
with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f: | with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f: | ||
1,108行目: | 1,160行目: | ||
return False | return False | ||
def _download_m3u8(self, | def _download_m3u8( | ||
self, | |||
media_path: str, | |||
ts_filename: str, | |||
mp4_filename: str, | |||
proxies: dict[str, str] | None) -> FfmpegStatus: | |||
"""ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ffmpegで動画をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
1,124行目: | 1,177行目: | ||
FfmpegStatus: ffmpegでの保存ステータス。 | FfmpegStatus: ffmpegでの保存ステータス。 | ||
""" | """ | ||
returncode: Final[int] = subprocess.run( | |||
[ | |||
'ffmpeg', '-y', | |||
'-http_proxy', 'proxies["http"]', | |||
'-i', urljoin(self.NITTER_INSTANCE, media_path), | |||
'-c', 'copy', ts_filename | |||
] if proxies is not None else [ | |||
'ffmpeg', '-y', | |||
'-i', urljoin(self.NITTER_INSTANCE, media_path), | |||
'-c', 'copy', ts_filename | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | # 取得成功したらtsをmp4に変換 | ||
if returncode == 0: | if returncode == 0: | ||
ts2mp4_returncode: int = subprocess.run( | ts2mp4_returncode: Final[int] = subprocess.run( | ||
[ | [ | ||
'ffmpeg', '-y', '-i', ts_filename, | 'ffmpeg', '-y', '-i', ts_filename, | ||
1,179行目: | 1,227行目: | ||
ZoneInfo('Asia/Tokyo')) | ZoneInfo('Asia/Tokyo')) | ||
def | def _fetch_tweet_media( | ||
self, | self, | ||
tweet: Tag, | tweet: Tag, | ||
tweet_url: str, | |||
accessor: AccessorHandler) -> str: | accessor: AccessorHandler) -> str: | ||
"""ツイートの画像や動画を取得する。 | """ツイートの画像や動画を取得する。 | ||
1,187行目: | 1,236行目: | ||
Args: | Args: | ||
tweet (Tag): ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | tweet (Tag): ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
accessor (AccessorHandler): | tweet_url (str): ツイートのURL。 | ||
accessor (AccessorHandler): アクセスハンドラ。 | |||
Returns: | Returns: | ||
1,193行目: | 1,243行目: | ||
""" | """ | ||
# 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択 | # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択 | ||
tweet_media: Tag | None = tweet.select_one( | tweet_media: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .attachments') | '.tweet-body > .attachments') | ||
media_txt: str = '' | media_txt: str = '' | ||
1,204行目: | 1,254行目: | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
matched: Match[str] | None = re.search( | matched: Match[str] | None = re.search( | ||
r'%2F([^%]*\.jpg | r'%2F([^%]*\.(?:jpg|jpeg|png|gif))', | ||
href) | href) | ||
assert matched is not None | assert matched is not None | ||
media_name: str = | media_name: Final[str] = matched.group(1) | ||
media_list.append(f'[[ファイル:{media_name}|240px]]') | media_list.append(f'[[ファイル:{media_name}|240px]]') | ||
if self._download_media(media_name, accessor): | if self._download_media( | ||
urljoin(self.TWITTER_MEDIA_URL, media_name), | |||
media_name, | |||
accessor): | |||
logger.info( | logger.info( | ||
os.path.join(self.MEDIA_DIR, media_name) | os.path.join(self.MEDIA_DIR, media_name) | ||
1,219行目: | 1,271行目: | ||
+ ' をアップロードしなければない。') | + ' をアップロードしなければない。') | ||
except AttributeError: | except AttributeError: | ||
logger.exception(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | |||
logger. | |||
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | ||
1,233行目: | 1,277行目: | ||
for i, video_container in enumerate( | for i, video_container in enumerate( | ||
tweet_media.select('.attachment.video-container')): | tweet_media.select('.attachment.video-container')): | ||
if not self._has_ffmpeg: | if not self._has_ffmpeg: | ||
logger. | logger.warn(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを') | ||
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
continue | continue | ||
# videoタグがない場合は取得できない | # videoタグがない場合は取得できない | ||
video = video_container.select_one('video') | video: Final[Tag | None] = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | ||
1,254行目: | 1,289行目: | ||
continue | continue | ||
data_url: str | list[str] | None = video.get('data-url') | data_url: Final[str | list[str] | None] = video.get('data-url') | ||
assert isinstance(data_url, str) | assert isinstance(data_url, str) | ||
matched: Match[str] | None = re.search(r'[^\/]+$', data_url) | matched: Match[str] | None = re.search(r'[^\/]+$', data_url) | ||
assert matched is not None | assert matched is not None | ||
media_path: str = unquote(matched.group( | media_path: Final[str] = unquote(matched.group()) | ||
tweet_id: str = tweet_url.split('/')[-1] | tweet_id: Final[str] = tweet_url.split('/')[-1] | ||
ts_filename: str = ( | ts_filename: Final[str] = ( | ||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | ||
) | ) | ||
mp4_filename: str = ( | mp4_filename: Final[str] = ( | ||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | ||
) | ) | ||
1,324行目: | 1,359行目: | ||
str: Wiki形式に書き直した投票結果。 | str: Wiki形式に書き直した投票結果。 | ||
""" | """ | ||
tweet_poll: Final[Tag | None] = tweet.select_one( | tweet_poll: Final[Tag | None] = tweet.select_one('.tweet-body > .poll') | ||
poll_txt: str = '' | poll_txt: str = '' | ||
if tweet_poll is not None: | if tweet_poll is not None: | ||
1,347行目: | 1,381行目: | ||
f'rgba(29, 155, 240, 0.58) 0 {ratio}, ' | f'rgba(29, 155, 240, 0.58) 0 {ratio}, ' | ||
f'transparent {ratio} 100%); ' | f'transparent {ratio} 100%); ' | ||
'font-weight: bold;">') | 'font-weight: bold;">') \ | ||
ratio + ' ' + poll_choice_option.text + '</span>' | + ratio + ' ' + poll_choice_option.text + '</span>' | ||
else: | else: | ||
poll_txt += ('<br>\n' | poll_txt += ('<br>\n' | ||
1,355行目: | 1,389行目: | ||
'to right, ' | 'to right, ' | ||
f'rgb(207, 217, 222) 0 {ratio}, ' | f'rgb(207, 217, 222) 0 {ratio}, ' | ||
f'transparent {ratio} 100%);">') | f'transparent {ratio} 100%);">') \ | ||
ratio + ' ' + poll_choice_option.text + '</span>' | + ratio + ' ' + poll_choice_option.text + '</span>' | ||
poll_txt += '<br>\n <span style="font-size: small;">' | poll_txt += '<br>\n <span style="font-size: small;">' \ | ||
poll_info.text + '</span>' | + poll_info.text + '</span>' | ||
return poll_txt | return poll_txt | ||
def _get_timeline_items(self, | def _get_timeline_items( | ||
self, | |||
soup: BeautifulSoup) -> list[Tag]: | |||
"""タイムラインのツイートを取得。 | """タイムラインのツイートを取得。 | ||
1,396行目: | 1,431行目: | ||
Args: | Args: | ||
tag (Tag): ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。 | tag (Tag): ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。 | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
""" | """ | ||
urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all( | urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all('a') | ||
for url in urls_in_tweet: | for url in urls_in_tweet: | ||
href: str | list[str] | None = url.get('href') | href: str | list[str] | None = url.get('href') | ||
1,410行目: | 1,444行目: | ||
url_link: str = href.replace( | url_link: str = href.replace( | ||
'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | 'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | ||
url_link = re.sub(' | url_link = re.sub(r'\?.*$', '', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif href.startswith('https://nitter.kavin.rocks/'): | elif href.startswith('https://nitter.kavin.rocks/'): | ||
1,416行目: | 1,450行目: | ||
url_link: str = href.replace( | url_link: str = href.replace( | ||
'https://nitter.kavin.rocks/', self.TWITTER_URL) | 'https://nitter.kavin.rocks/', self.TWITTER_URL) | ||
url_link = re.sub(' | url_link = re.sub(r'\?.*$', '', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif self._invidious_pattern.search(href): | elif (hasattr(self, '_invidious_pattern') | ||
and self._invidious_pattern.search(href)): | |||
# Nitter上のYouTubeへのリンクをInvidiousのものから直す | # Nitter上のYouTubeへのリンクをInvidiousのものから直す | ||
if re.match( | if re.match( | ||
1,464行目: | 1,499行目: | ||
if '#' in url: # フラグメント識別子の処理 | if '#' in url: # フラグメント識別子の処理 | ||
main_url, fragment = url.split('#', maxsplit=1) | main_url, fragment = url.split('#', maxsplit=1) | ||
return | return TableBuilder.archive_url( | ||
url, self._archive(main_url, accessor) + '#' + fragment, text) | url, self._archive(main_url, accessor) + '#' + fragment, text) | ||
else: | else: | ||
return | return TableBuilder.archive_url( | ||
url, self._archive(url, accessor), text) | url, self._archive(url, accessor), text) | ||
1,480行目: | 1,515行目: | ||
str: CallinShowLinkタグでラップしたURL。 | str: CallinShowLinkタグでラップしたURL。 | ||
""" | """ | ||
return | return TableBuilder.callinshowlink_url( | ||
url, self._archive(url, accessor)) | url, self._archive(url, accessor)) | ||
1,524行目: | 1,559行目: | ||
return archive_url | return archive_url | ||
def _get_tweet(self, accessor: AccessorHandler) -> | def _get_tweet(self, accessor: AccessorHandler) -> bool: | ||
"""ページからツイート本文を ``TableBuilder`` インスタンスに収めていく。 | """ページからツイート本文を ``TableBuilder`` インスタンスに収めていく。 | ||
ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` | ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら | ||
`False` を返す。 | |||
Args: | Args: | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
Returns: | Returns: | ||
bool: 終わりにするツイートを発見するか、記録件数が上限に達したら `False`。 | |||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup( | soup: Final[BeautifulSoup] = BeautifulSoup( | ||
self._page, 'html.parser') | self._page, 'html.parser') | ||
tweets: Final[list[Tag]] = self._get_timeline_items( | tweets: Final[list[Tag]] = self._get_timeline_items(soup) | ||
for tweet in tweets: | |||
for tweet in tweets: | |||
tweet_a: Tag | None = tweet.a | tweet_a: Tag | None = tweet.a | ||
assert tweet_a is not None | assert tweet_a is not None | ||
1,560行目: | 1,595行目: | ||
if not_match: | if not_match: | ||
continue | continue | ||
# 日付の更新処理 | |||
date: Final[datetime] = self._tweet_date(tweet) | |||
self._table_builder.next_day_if_necessary(date) | |||
tweet_link: Tag | NavigableString | None = tweet.find( | tweet_link: Tag | NavigableString | None = tweet.find( | ||
1,566行目: | 1,605行目: | ||
href: str | list[str] | None = tweet_link.get('href') | href: str | list[str] | None = tweet_link.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
tweet_url: str = urljoin( | tweet_url: Final[str] = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
re.sub('#[^#]*$', '', href)) | re.sub('#[^#]*$', '', href)) | ||
tweet_callinshow_template: Final[str] = self._callinshowlink_url( | |||
tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: Tag | NavigableString | None = tweet.find( | tweet_content: Tag | NavigableString | None = tweet.find( | ||
1,577行目: | 1,615行目: | ||
assert isinstance(tweet_content, Tag) | assert isinstance(tweet_content, Tag) | ||
self._archive_soup(tweet_content, accessor) | self._archive_soup(tweet_content, accessor) | ||
media_txt: str = self. | media_txt: Final[str] = self._fetch_tweet_media( | ||
quote_txt: str = self._get_tweet_quote(tweet, accessor) | tweet, | ||
poll_txt: str = self._get_tweet_poll(tweet) | tweet_url, | ||
accessor) | |||
quote_txt: Final[str] = self._get_tweet_quote(tweet, accessor) | |||
poll_txt: Final[str] = self._get_tweet_poll(tweet) | |||
self._table_builder.append( | self._table_builder.append( | ||
tweet_callinshow_template, '<br>\n'.join( | |||
filter( | filter( | ||
None, | None, | ||
[ | [ | ||
TableBuilder.escape_wiki_reserved_words( | |||
tweet_content.get_text()), | tweet_content.get_text()), | ||
quote_txt, media_txt, poll_txt | quote_txt, media_txt, poll_txt | ||
1,596行目: | 1,637行目: | ||
logger.info('目的ツイート発見でもう尾張屋根') | logger.info('目的ツイート発見でもう尾張屋根') | ||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
return False | |||
if self._table_builder.count >= self.LIMIT_N_TWEETS: | if self._table_builder.count >= self.LIMIT_N_TWEETS: | ||
logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。') | logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。') | ||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
return False | |||
return True | |||
def _go_to_new_page(self, accessor: AccessorHandler) -> | def _go_to_new_page(self, accessor: AccessorHandler) -> bool: | ||
"""Nitterで次のページに移動する。 | """Nitterで次のページに移動する。 | ||
次のページが無ければ `False` を返す。 | |||
Args: | Args: | ||
accessor (AccessorHandler): | accessor (AccessorHandler): アクセスハンドラ。 | ||
Returns: | Returns: | ||
bool: 次のページを取得できれば `True`。 | |||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | ||
1,631行目: | 1,673行目: | ||
if res is None: | if res is None: | ||
self._on_fail() | self._on_fail() | ||
return False | |||
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
if new_page_soup.find( | if new_page_soup.find( | ||
1,637行目: | 1,680行目: | ||
logger.info(new_url + 'に移動しますを') | logger.info(new_url + 'に移動しますを') | ||
self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集 | self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集 | ||
return True | |||
else: | else: | ||
logger.info('急に残りツイートが無くなったな終了するか') | logger.info('急に残りツイートが無くなったな終了するか') | ||
self._table_builder.dump_file() | self._table_builder.dump_file() | ||
return False | |||
def execute(self, krsw: bool = False, use_browser: bool = True, | def execute(self, krsw: bool = False, use_browser: bool = True, | ||
enable_javascript: bool = True) -> NoReturn: | enable_javascript: bool = True) -> None | NoReturn: | ||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
Args: | Args: | ||
krsw (bool, optional): | krsw (bool, optional): `True` の場合、名前が自動で :const:`~CALLINSHOW` になり、 | ||
クエリと終わりにするツイートが自動で無しになる。 | クエリと終わりにするツイートが自動で無しになる。 | ||
use_browser (bool, optional): | use_browser (bool, optional): `True` ならSeleniumを利用する。 | ||
`False` ならRequestsのみでアクセスする。 | |||
enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は | enable_javascript (bool, optional): SeleniumでJavaScriptを利用する場合は | ||
`True`。 | |||
""" | """ | ||
# Seleniumドライバーを必ず終了するため、with文を利用する。 | # Seleniumドライバーを必ず終了するため、with文を利用する。 | ||
1,667行目: | 1,711行目: | ||
# 検索クエリの設定 | # 検索クエリの設定 | ||
self._set_queries(accessor, krsw) | if not self._set_queries(accessor, krsw): | ||
sys.exit(1) | |||
# ツイートを取得し終えるまでループ | # ツイートを取得し終えるまでループ | ||
while True: | while True: | ||
self._get_tweet(accessor) | if not self._get_tweet(accessor): | ||
self._go_to_new_page(accessor) | break | ||
if not self._go_to_new_page(accessor): | |||
break | |||
if | class UrlTuple(NamedTuple): | ||
if sys. | """URLとその魚拓のURLのペア。 | ||
""" | |||
logger.critical(' | url: str | ||
sys.exit(1) | """URL。 | ||
parser: ArgumentParser = ArgumentParser() | """ | ||
parser.add_argument( | archive_url: str | ||
'--krsw', | """魚拓のURL。 | ||
action='store_true', | """ | ||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | |||
parser.add_argument( | |||
'-n', | class ArchiveCrawler(TwitterArchiver): | ||
'--no_browser', | """archive.todayに記録された尊師のツイートのうち、Wiki未掲載のものを収集する。 | ||
action='store_true', | """ | ||
help='指定すると、Tor Browserを利用しない。') | |||
parser.add_argument( | TWEET_URL_PREFIX_DEFAULT: Final[str] = '17' | ||
'-d', | """Final[str]: ツイートURLの数字部分のうち、予め固定しておく部分。 | ||
'--disable_script', | |||
action='store_true', | ツイッターURLの数字部分がこの数字で始まるもののみをクロールする。 | ||
:func:`~_next_url` の `tweet_url_prefix` のデフォルト値。 | |||
""" | |||
INCREMENTED_NUM_DEFAULT: Final[int] = 0 | |||
"""Final[int]: ツイートURLの数字部分うち、インクリメントする桁のデフォルト値。 | |||
:const:`~TWEET_URL_PREFIX_DEFAULT` に続く桁をこの数字からインクリメントする。 | |||
:func:`~_next_url` の `incremented_num` のデフォルト値。 | |||
""" | |||
TEMPLATE_URL: Final[str] = 'https://krsw-wiki.org/wiki/テンプレート:降臨ショー恒心ログ' | |||
"""Final[str]: テンプレート:降臨ショー恒心ログのURL。 | |||
""" | |||
FILENAME: Final[str] = 'url_list.txt' | |||
"""Final[str]: URLのリストをダンプするファイル名。 | |||
""" | |||
@override | |||
def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool: | |||
"""検索条件を設定する。 | |||
:class:`~TwitterArchiver` のメンバをセットし、ツイートを収集するアカウントを入力させる。 | |||
Args: | |||
accessor (AccessorHandler): アクセスハンドラ | |||
krsw (bool): `True` の場合、名前が :const:`~CALLINSHOW` になる。 | |||
""" | |||
# ユーザー名取得 | |||
if krsw: | |||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | |||
self._name: str = self.CALLINSHOW | |||
else: | |||
name_optional: str | None = self._get_name(accessor) | |||
if name_optional is not None: | |||
self._name: str = name_optional | |||
else: | |||
return False | |||
logger.info( | |||
'ユーザー名: @' + self._name + 'で検索しまふ' | |||
) | |||
self._url_list_on_wiki: list[str] = [] | |||
self._url_list: list[UrlTuple] = [] | |||
return True | |||
def _get_tweet_urls_from_wiki(self, accessor: AccessorHandler) -> None: | |||
"""Wikiに未掲載のツイートのURLリストを取得する。 | |||
Args: | |||
accessor (AccessorHandler): アクセスハンドラ。 | |||
""" | |||
def assert_get(tag: Tag, key: str) -> str: | |||
"""BeautifulSoupのタグから属性値を取得する。 | |||
Args: | |||
tag (Tag): BeautifulSoupのタグ。 | |||
key (str): 属性キー。 | |||
Returns: | |||
str: タグの属性値。 | |||
""" | |||
result: str | list[str] | None = tag.get(key) | |||
assert isinstance(result, str) | |||
return result | |||
template_page: Final[str | None] = ( | |||
accessor.request_with_requests_module(self.TEMPLATE_URL)) | |||
assert template_page is not None | |||
template_soup: Final[BeautifulSoup] = BeautifulSoup( | |||
template_page, | |||
'html.parser') | |||
urls: Final[list[str]] = list(map( | |||
lambda x: 'https://krsw-wiki.org' + assert_get(x, 'href'), | |||
template_soup.select('.wikitable > tbody > tr > td a'))) | |||
for url in urls: | |||
logger.info(f'{unquote(url)} で収集中でふ') | |||
page: Final[str | None] = ( | |||
accessor.request_with_requests_module(url)) | |||
assert page is not None | |||
soup: Final[BeautifulSoup] = BeautifulSoup(page, 'html.parser') | |||
url_as = soup.select('tr > th a') | |||
for url_a in url_as: | |||
href: str | list[str] | None = url_a.get('href') | |||
assert isinstance(href, str) | |||
if href.startswith(self.TWITTER_URL + self._name): | |||
self._url_list_on_wiki.append(href) | |||
def _append_tweet_urls(self, soup: BeautifulSoup) -> None: | |||
"""ツイートのURLを保存する。 | |||
Args: | |||
soup (BeautifulSoup): archive.todayでのURL検索結果のページのオブジェクト。 | |||
""" | |||
tweets: Final[ResultSet[Tag]] = soup.select( | |||
'#CONTENT > div > .TEXT-BLOCK') | |||
for tweet in tweets: | |||
a_last_child: Tag | None = tweet.select_one('a:last-child') | |||
assert a_last_child is not None | |||
url_matched: Final[Match[str]] | None = re.match( | |||
self.TWITTER_URL + self._name + r'/status/\d+', | |||
a_last_child.text) | |||
if url_matched is not None: | |||
a_first_child: Tag | None = tweet.select_one('a:first-child') | |||
assert a_first_child is not None | |||
archive_url: str | list[str] | None = a_first_child.get('href') | |||
assert isinstance(archive_url, str) | |||
if url_matched[0] not in self._url_list_on_wiki: | |||
# ツイートのURLが未取得のものならばURLを保存する | |||
self._url_list.append( | |||
UrlTuple(url_matched[0], archive_url)) | |||
self._url_list.sort(reverse=True, key=lambda x: x.url) # 降順 | |||
def _go_to_next( | |||
self, | |||
soup: BeautifulSoup, | |||
accessor: AccessorHandler) -> bool: | |||
"""archive.todayの検索結果のページをpaginateする。 | |||
Args: | |||
soup (BeautifulSoup): archive.todayでのURL検索結果のページのオブジェクト。 | |||
accessor (AccessorHandler): アクセスハンドラ。 | |||
Returns: | |||
bool: 次のページがあれば `True`。 | |||
""" | |||
next_a: Tag | None = soup.select_one('#next') | |||
if next_a is not None: | |||
link: str | list[str] | None = next_a.get('href') | |||
assert isinstance(link, str) | |||
page: Final[str | None] = accessor.request(link) | |||
assert page is not None | |||
self._page = page | |||
return True | |||
else: | |||
return False | |||
def _get_tweet_loop( | |||
self, | |||
soup: BeautifulSoup, | |||
accessor: AccessorHandler) -> None: | |||
"""archive.todayの検索結果に対して、paginateしながら未記載のツイートURLを記録する。 | |||
Args: | |||
soup (BeautifulSoup): archive.todayでのURL検索結果のページのオブジェクト。 | |||
accessor (AccessorHandler): アクセスハンドラ。 | |||
""" | |||
has_next: bool = True | |||
while has_next: | |||
self._append_tweet_urls(soup) | |||
has_next = self._go_to_next(soup, accessor) | |||
soup = BeautifulSoup(self._page) | |||
def _next_url( | |||
self, | |||
accessor: AccessorHandler, | |||
tweet_url_prefix: str, | |||
incremented_num: int) -> None: | |||
"""ツイートのURLを、数字部分をインクリメントしながら探索する。 | |||
`https://twitter.com/CallinShow/status/` に続く数字部分について、 | |||
`tweet_url_prefix` で始まるものを、その次の桁を `incremented_num` から9までインクリメントして探索する。 | |||
Args: | |||
accessor (AccessorHandler): アクセスハンドラ。 | |||
tweet_url_prefix (str): ツイートURLの数字部分のうち、インクリメントする桁以前の部分。 | |||
incremented_num (int): ツイートURLのうちインクリメントする桁の現在の数字。 | |||
Examples: | |||
`https://twitter.com/CallinShow/status/1707` で始まるURLをすべて探索する場合 | |||
:: | |||
self._next_url(accessor, '1707', 0) | |||
`https://twitter.com/CallinShow/status/165` で始まるURLから | |||
`https://twitter.com/CallinShow/status/169` で始まるURLまでをすべて探索する場合 | |||
:: | |||
self._next_url(accessor, '16', 5) | |||
""" | |||
logger.info(self.TWITTER_URL + self._name + '/status/' | |||
+ tweet_url_prefix + str(incremented_num) + '*を探索中') | |||
page: Final[str | None] = accessor.request( | |||
self.ARCHIVE_TODAY | |||
+ self.TWITTER_URL | |||
+ self._name | |||
+ '/status/' | |||
+ tweet_url_prefix | |||
+ str(incremented_num) + '*') | |||
assert page is not None | |||
soup: Final[BeautifulSoup] = BeautifulSoup(page, 'html.parser') | |||
pager: Tag | None = soup.select_one('#pager') | |||
if pager is not None: # 検索結果が複数ページ | |||
page_num_matched: Final[Match[str] | None] = re.search( | |||
r'of (\d+) urls', pager.text) | |||
assert page_num_matched is not None | |||
page_num: Final[int] = int(page_num_matched[1]) | |||
if page_num > 100: # ツイート数が100を超えると途中でreCAPTCHAが入るので、もっと細かく検索 | |||
self._next_url(accessor, | |||
tweet_url_prefix + str(incremented_num), 0) | |||
else: | |||
logger.debug( | |||
self.TWITTER_URL + self._name + '/status/' | |||
+ tweet_url_prefix + str(incremented_num) + '*からURLを収集しまふ') | |||
self._get_tweet_loop(soup, accessor) | |||
else: # 検索結果が1ページだけ | |||
if soup.select_one('.TEXT-BLOCK'): # 検索結果が存在する場合 | |||
logger.debug( | |||
self.TWITTER_URL + self._name + '/status/' | |||
+ tweet_url_prefix + str(incremented_num) + '*からURLを収集しまふ') | |||
self._get_tweet_loop(soup, accessor) | |||
# 次のurlを探索 | |||
if incremented_num == 9: | |||
return | |||
else: | |||
self._next_url(accessor, tweet_url_prefix, incremented_num + 1) | |||
@override | |||
def _tweet_date(self, tweet: Tag) -> datetime: | |||
datetime_tag: Final[Tag | None] = tweet.select_one('time[datetime]') | |||
assert datetime_tag is not None | |||
datetime_str: Final[str | list[str] | None] = ( | |||
datetime_tag.get('datetime')) | |||
assert isinstance(datetime_str, str) | |||
raw_time: datetime = datetime.strptime( | |||
datetime_str, | |||
'%Y-%m-%dT%H:%M:%SZ') | |||
return raw_time.replace(tzinfo=ZoneInfo('Asia/Tokyo')) | |||
@override | |||
def execute(self, krsw: bool = False, use_browser: bool = True, | |||
enable_javascript: bool = True) -> None | NoReturn: | |||
logger.info('Wikiに未掲載のツイートのURLを収集しますを') | |||
# Seleniumドライバーを必ず終了するため、with文を利用する。 | |||
with AccessorHandler(use_browser, enable_javascript) as accessor: | |||
# 実行前のチェック | |||
self._check_archive_instance(accessor) | |||
# 検索クエリの設定 | |||
if not self._set_queries(accessor, krsw): | |||
sys.exit(1) | |||
# Wikiに既に掲載されているツイートのURLを取得 | |||
self._get_tweet_urls_from_wiki(accessor) | |||
# 未掲載のツイートのURLを取得する | |||
self._next_url(accessor, | |||
self.TWEET_URL_PREFIX_DEFAULT, | |||
self.INCREMENTED_NUM_DEFAULT) | |||
with codecs.open(self.FILENAME, 'w', 'utf-8') as f: | |||
for url_pair in self._url_list: | |||
f.write(url_pair.url + '\n') | |||
logger.info('テキストファイル手に入ったやで〜') | |||
if __name__ == '__main__': | |||
if sys.version_info < (3, 12): | |||
print('Pythonのバージョンを3.12以上に上げて下さい') | |||
logger.critical('貴職のPythonのバージョン: ' + str(sys.version_info)) | |||
sys.exit(1) | |||
parser: Final[ArgumentParser] = ArgumentParser() | |||
parser.add_argument( | |||
'--krsw', | |||
action='store_true', | |||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | |||
parser.add_argument( | |||
'-n', | |||
'--no_browser', | |||
action='store_true', | |||
help='指定すると、Tor Browserを利用しない。') | |||
parser.add_argument( | |||
'-d', | |||
'--disable_script', | |||
action='store_true', | |||
help='指定すると、Tor BrowserでJavaScriptを利用しない。') | help='指定すると、Tor BrowserでJavaScriptを利用しない。') | ||
args: Namespace = parser.parse_args() | parser.add_argument( | ||
'-u', | |||
'--search_unarchived', | |||
action='store_true', | |||
help=('指定すると、Wikiに未掲載のツイートのURLをarchive.todayから収集する。' | |||
'リツイートのURLも収集してしまうので注意。')) | |||
args: Final[Namespace] = parser.parse_args() | |||
logger.debug('args: ' + str(args)) | |||
twitter_archiver: TwitterArchiver = TwitterArchiver() | twitter_archiver: Final[TwitterArchiver] = ArchiveCrawler() if ( | ||
args.search_unarchived) else TwitterArchiver() | |||
twitter_archiver.execute( | twitter_archiver.execute( | ||
args.krsw, | args.krsw, |