「利用者:夜泣き/スクリプト」の版間の差分
>Fet-Fe (→コード: v4.0.2 JavaScriptをオフにするオプションと、Tor Browserを利用しないオプションを追加) |
>Fet-Fe (→コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.0. | ver4.0.3 2023/9/28恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
23行目: | 23行目: | ||
$ python3 (ファイル名) --krsw | $ python3 (ファイル名) --krsw | ||
``-- | ``--no_browser`` オプションでTor Browserを使用しないモードに、 | ||
``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | |||
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | 自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | ||
29行目: | 30行目: | ||
Note: | Note: | ||
* Pythonのバージョンは3. | * Pythonのバージョンは3.11以上 | ||
* 環境は玉葱前提です。 | * 環境は玉葱前提です。 | ||
* Whonix-Workstation, MacOSで動作確認済 | * Whonix-Workstation, MacOSで動作確認済 | ||
45行目: | 46行目: | ||
""" | """ | ||
import codecs | import codecs | ||
import json | import json | ||
import logging | |||
import os | |||
import platform | |||
import random | import random | ||
import re | |||
import subprocess | |||
import sys | |||
import warnings | |||
from argparse import ArgumentParser, Namespace | |||
from collections.abc import Callable | |||
from datetime import datetime | from datetime import datetime | ||
from | from logging import Logger, getLogger | ||
from time import sleep | from time import sleep | ||
from types import MappingProxyType | from types import MappingProxyType, TracebackType | ||
from typing import Final, NoReturn | from typing import Any, Final, NoReturn, Self | ||
from urllib.parse import quote, unquote, urljoin | from urllib.parse import quote, unquote, urljoin | ||
from zoneinfo import ZoneInfo | |||
from | |||
import requests | import requests | ||
from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||
from bs4.element import NavigableString, ResultSet, Tag | |||
from selenium import webdriver | from selenium import webdriver | ||
from selenium.common.exceptions import NoSuchElementException, WebDriverException | from selenium.common.exceptions import (NoSuchElementException, | ||
WebDriverException) | |||
from selenium.webdriver.common.by import By | |||
from selenium.webdriver.firefox.options import Options | from selenium.webdriver.firefox.options import Options | ||
from selenium.webdriver.support import expected_conditions as ec | |||
from selenium.webdriver.support import expected_conditions as | from selenium.webdriver.support.wait import WebDriverWait | ||
from selenium.webdriver. | |||
logging.basicConfig(level=logging.INFO, | |||
format='{asctime} [{levelname:.4}] : {message}', style='{') | |||
logger: Logger = getLogger(__name__) | logger: Logger = getLogger(__name__) | ||
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | |||
warnings.simplefilter('ignore') | warnings.simplefilter('ignore') | ||
87行目: | 91行目: | ||
class | class ReCaptchaRequiredError(Exception): | ||
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | ||
""" | """ | ||
118行目: | 122行目: | ||
自動操縦だとWebサイトに見破られないため。 | 自動操縦だとWebサイトに見破られないため。 | ||
""" | """ | ||
sleep(random.randrange(self.WAIT_TIME, self.WAIT_TIME + self.WAIT_RANGE)) | sleep(random.randrange(self.WAIT_TIME, | ||
self.WAIT_TIME + self.WAIT_RANGE)) | |||
153行目: | 158行目: | ||
""" | """ | ||
self._proxies: dict[str, str] | None = None | self._proxies: dict[str, str] | None = None | ||
self._proxies = self. | self._proxies = self._choose_tor_proxies() # Torに必要なプロキシをセット | ||
def _execute(self, url: | def _execute(self, | ||
url: str, | |||
proxies: dict[str, | |||
str] | None) -> requests.models.Response: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | """引数のURLにrequestsモジュールでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
proxies dir[str, str]: | proxies dir[str, str]: 接続に利用するプロキシ。 | ||
デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。 | |||
Returns: | Returns: | ||
169行目: | 178行目: | ||
AccessError: ステータスコードが200でない場合のエラー。 | AccessError: ステータスコードが200でない場合のエラー。 | ||
""" | """ | ||
sleep(self.WAIT_TIME) | sleep(self.WAIT_TIME) # DoS対策で待つ | ||
try: | try: | ||
res: requests.models.Response = requests.get(url, timeout=self.REQUEST_TIMEOUT, headers=self.HEADERS, allow_redirects=False, proxies=proxies if proxies is not None else self._proxies) | res: requests.models.Response = requests.get( | ||
res.raise_for_status() | url, | ||
timeout=self.REQUEST_TIMEOUT, | |||
headers=self.HEADERS, | |||
allow_redirects=False, | |||
proxies=proxies if proxies is not None else self._proxies) | |||
res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生 | |||
except requests.exceptions.ConnectionError: | except requests.exceptions.ConnectionError: | ||
raise | raise | ||
except requests.exceptions.RequestException as e: | except requests.exceptions.RequestException as e: | ||
# requestsモジュール固有の例外を共通の例外に変換 | # requestsモジュール固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from | raise AccessError(str(e)) from e | ||
return res | return res | ||
def get(self, url: | def get(self, | ||
url: str, | |||
proxies: dict[str, | |||
str] | None = None) -> str: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | """引数のURLにrequestsモジュールでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~ | proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。 | ||
Returns: | Returns: | ||
199行目: | 216行目: | ||
raise | raise | ||
def get_image(self, url: | def get_image(self, url: str) -> bytes | None: | ||
"""引数のURLから画像のバイナリ列を取得する。 | """引数のURLから画像のバイナリ列を取得する。 | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
222行目: | 239行目: | ||
return None | return None | ||
def | def _choose_tor_proxies(self) -> dict[str, str] | None | NoReturn: | ||
"""Torを使うのに必要なプロキシ情報を返す。 | """Torを使うのに必要なプロキシ情報を返す。 | ||
プロキシなしで接続できればNone、 | |||
Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、 | |||
torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。 | |||
いずれでもアクセスできなければ異常終了する。 | いずれでもアクセスできなければ異常終了する。 | ||
234行目: | 253行目: | ||
RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。 | RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。 | ||
""" | """ | ||
logger.info('Torのチェック中ですを') | |||
# プロキシなしでTorにアクセスできるかどうか | # プロキシなしでTorにアクセスできるかどうか | ||
res: str = self.get(self.TOR_CHECK_URL, proxies=None) | res: str = self.get(self.TOR_CHECK_URL, proxies=None) | ||
is_tor: bool = json.loads(res)['IsTor'] | is_tor: bool = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor connection OK') | |||
return None | return None | ||
# Tor BrowserのプロキシでTorにアクセスできるかどうか | # Tor BrowserのプロキシでTorにアクセスできるかどうか | ||
try: | try: | ||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_BROWSER) | res = self.get(self.TOR_CHECK_URL, | ||
proxies=self.PROXIES_WITH_BROWSER) | |||
is_tor = json.loads(res)['IsTor'] | is_tor = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor connection OK') | |||
return self.PROXIES_WITH_BROWSER | return self.PROXIES_WITH_BROWSER | ||
except requests.exceptions.ConnectionError: | except requests.exceptions.ConnectionError: | ||
254行目: | 274行目: | ||
# torコマンドのプロキシでTorにアクセスできるかどうか | # torコマンドのプロキシでTorにアクセスできるかどうか | ||
try: | try: | ||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_COMMAND) | res = self.get(self.TOR_CHECK_URL, | ||
proxies=self.PROXIES_WITH_COMMAND) | |||
is_tor = json.loads(res)['IsTor'] | is_tor = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor proxy OK') | |||
return self.PROXIES_WITH_COMMAND | return self.PROXIES_WITH_COMMAND | ||
else: | else: | ||
raise RuntimeError('サイトにTorのIPでアクセスできていないなりを') | raise RuntimeError('サイトにTorのIPでアクセスできていないなりを') | ||
except requests.exceptions.ConnectionError as e: | except requests.exceptions.ConnectionError as e: | ||
logger.critical(e) | |||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを') | |||
exit(1) | exit(1) | ||
281行目: | 302行目: | ||
TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | ||
'Windows': '', | |||
'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox', | |||
'Linux': '/usr/bin/torbrowser' | |||
}) | }) | ||
"""MappingProxyType[str, str]: OSごとのTor Browserのパス。 | """MappingProxyType[str, str]: OSごとのTor Browserのパス。 | ||
309行目: | 330行目: | ||
if enable_javascript: | if enable_javascript: | ||
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | |||
options.preferences.update({ | options.preferences.update({ | ||
'javascript.enabled': enable_javascript, | |||
'intl.accept_languages': 'en-US, en', | |||
'intl.locale.requested': 'US', | |||
'font.language.group': 'x-western', | |||
'dom.webdriver.enabled': False # 自動操縦と見破られないための設定 | |||
}) | }) | ||
322行目: | 343行目: | ||
self.driver: webdriver.Firefox = webdriver.Firefox(options=options) | self.driver: webdriver.Firefox = webdriver.Firefox(options=options) | ||
sleep(1) | sleep(1) | ||
wait_init: WebDriverWait = WebDriverWait(self.driver, self.WAIT_TIME_FOR_INIT) | wait_init: WebDriverWait = WebDriverWait(self.driver, | ||
wait_init.until( | self.WAIT_TIME_FOR_INIT) | ||
self.driver.find_element(By.ID, | wait_init.until( | ||
wait_init.until( | ec.element_to_be_clickable((By.ID, 'connectButton')) | ||
) | |||
self.driver.find_element(By.ID, 'connectButton').click() | |||
wait_init.until(ec.url_contains('about:blank')) # Torの接続が完了するまで待つ | |||
self.wait: WebDriverWait = WebDriverWait(self.driver, self.REQUEST_TIMEOUT) | self.wait: WebDriverWait = WebDriverWait(self.driver, | ||
except | self.REQUEST_TIMEOUT) | ||
except BaseException: | |||
self.quit() | self.quit() | ||
raise | raise | ||
342行目: | 367行目: | ||
Raises: | Raises: | ||
ReCaptchaRequiredError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。 | |||
Todo: | |||
botバレしたときに自動で他のTorサーキットに接続し直す。 | |||
""" | """ | ||
try: | try: | ||
self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]') | self.driver.find_element( # 要素がない時に例外を吐く | ||
By.CSS_SELECTOR, | |||
'script[src^="https://www.google.com/recaptcha/api.js"]') | |||
if self._javascript_enabled: | if self._javascript_enabled: | ||
logger.warning('reCAPTCHAを解いてね(笑)、それはできるよね。') | |||
logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ') | |||
WebDriverWait(self.driver, 10000).until( | WebDriverWait( | ||
sleep(self.WAIT_TIME) | self.driver, | ||
10000).until( | |||
ec.staleness_of( | |||
self.driver.find_element( | |||
By.CSS_SELECTOR, | |||
'script[src^="https://www.google.com/recaptcha/api.js"]'))) | |||
sleep(self.WAIT_TIME) # DoS対策で待つ | |||
else: | else: | ||
raise | raise ReCaptchaRequiredError( | ||
'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: ' | |||
+ self.driver.current_url) | |||
except NoSuchElementException: | except NoSuchElementException: | ||
# reCAPTCHAの要素がなければそのまま | # reCAPTCHAの要素がなければそのまま | ||
pass | pass | ||
def get(self, url: | def get(self, url: str) -> str: | ||
"""引数のURLにSeleniumでHTTP接続する。 | """引数のURLにSeleniumでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
Returns: | Returns: | ||
str: レスポンスのHTML。 | str: レスポンスのHTML。 | ||
""" | """ | ||
self._random_sleep() | self._random_sleep() # DoS対策で待つ | ||
try: | try: | ||
self.driver.get(url) | self.driver.get(url) | ||
372行目: | 410行目: | ||
except WebDriverException as e: | except WebDriverException as e: | ||
# Selenium固有の例外を共通の例外に変換 | # Selenium固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from | raise AccessError(str(e)) from e | ||
return self.driver.page_source | return self.driver.page_source | ||
399行目: | 437行目: | ||
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | ||
""" | """ | ||
self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(enable_javascript) if use_browser else None | self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor( | ||
enable_javascript) if use_browser else None | |||
self.requests_accessor: RequestsAccessor = RequestsAccessor() | self.requests_accessor: RequestsAccessor = RequestsAccessor() | ||
def __enter__(self) -> Self: | def __enter__(self) -> Self: | ||
"""withブロックの開始時に実行する。 | """withブロックの開始時に実行する。 | ||
Returns: | |||
Self: オブジェクト自身。 | |||
""" | """ | ||
return self | return self | ||
def __exit__(self, | def __exit__(self, | ||
exc_type: type[BaseException] | None, | |||
exc_value: BaseException | None, | |||
traceback: TracebackType | None) -> None: | |||
"""withブロックの終了時に実行する。 | """withブロックの終了時に実行する。 | ||
Args: | |||
exc_type (type[BaseException] | None): コンテキスト内で例外を吐いた場合の例外タイプ。 | |||
exc_value (BaseException | None): コンテキスト内で例外を吐いた場合の例外。 | |||
traceback (TracebackType | None): コンテキスト内で例外を吐いた場合のトレースバック。 | |||
""" | """ | ||
if self.selenium_accessor is not None: | if self.selenium_accessor is not None: | ||
self.selenium_accessor.quit() | self.selenium_accessor.quit() | ||
def request_once(self, url: | def request_once(self, url: str) -> str: | ||
"""引数のURLにHTTP接続する。 | """引数のURLにHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
Returns: | Returns: | ||
436行目: | 486行目: | ||
raise | raise | ||
def _request_with_callable(self, url: | def _request_with_callable(self, | ||
url: str, | |||
request_callable: Callable[[str], | |||
Any]) -> Any | None: | |||
"""request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
443行目: | 496行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。 | request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。 | ||
Returns: | Returns: | ||
Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。 | Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。 | ||
""" | """ | ||
for i in range(1, self.LIMIT_N_REQUESTS + 1): | for i in range(1, self.LIMIT_N_REQUESTS + 1): | ||
456行目: | 506行目: | ||
res: Any = request_callable(url) | res: Any = request_callable(url) | ||
except AccessError: | except AccessError: | ||
logger.warning( | |||
url | |||
+ 'への通信失敗ナリ ' | |||
+ f'{i}/{self.LIMIT_N_REQUESTS}回') | |||
if i < self.LIMIT_N_REQUESTS: | if i < self.LIMIT_N_REQUESTS: | ||
sleep(self.WAIT_TIME_FOR_ERROR) | sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | ||
else: | else: | ||
return res | return res | ||
return None | return None | ||
def request(self, url: | def request(self, url: str) -> str | None: | ||
"""HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
470行目: | 523行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
480行目: | 533行目: | ||
return self._request_with_callable(url, self.request_once) | return self._request_with_callable(url, self.request_once) | ||
def request_with_requests_module(self, url: | def request_with_requests_module(self, url: str) -> str | None: | ||
"""requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
487行目: | 540行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
497行目: | 550行目: | ||
return self._request_with_callable(url, self.requests_accessor.get) | return self._request_with_callable(url, self.requests_accessor.get) | ||
def request_image(self, url: | def request_image(self, url: str) -> bytes | None: | ||
"""requestsモジュールで画像ファイルを取得します。 | """requestsモジュールで画像ファイルを取得します。 | ||
509行目: | 562行目: | ||
bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。 | bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。 | ||
""" | """ | ||
return self._request_with_callable(url, self.requests_accessor.get_image) | return self._request_with_callable(url, | ||
self.requests_accessor.get_image) | |||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
"""RequestsAccessorオブジェクトのプロキシ設定を返す。 | |||
Returns: | |||
dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。 | |||
""" | |||
return self.requests_accessor.proxies | return self.requests_accessor.proxies | ||
class TableBuilder: | |||
def __init__(self, date: datetime): | |||
self._tables: list[str] = [''] | |||
self._count: int = 0 # 記録数 | |||
self._date: datetime = date | |||
@property | |||
def count(self) -> int: | |||
return self._count | |||
def append(self, archived_tweet_url: str, text: str) -> None: | |||
self._tables[0] = '!' + archived_tweet_url + '\n|-\n|\n' \ | |||
+ text \ | |||
+ '\n|-\n' \ | |||
+ self._tables[0] # wikiの文法に変化 | |||
self._count += 1 | |||
def dump_file(self) -> NoReturn: | |||
"""Wikiテーブルをファイル出力し、プログラムを終了する。 | |||
""" | |||
self._next_day() | |||
result_txt: Final[str] = '\n'.join(self._tables) | |||
with codecs.open('tweet.txt', 'w', 'utf-8') as f: | |||
f.write(result_txt) | |||
logger.info('テキストファイル手に入ったやで〜') | |||
exit(0) | |||
def next_day_if_necessary(self, date: datetime | None = None) -> None: | |||
if date is None: | |||
return | |||
if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day: | |||
self._next_day(date) | |||
def _next_day(self, date: datetime | None = None) -> None: | |||
if self._tables[0]: | |||
self._tables[0] = self._convert_to_text_table(self._tables[0]) | |||
if os.name == 'nt': # Windows | |||
self._txt_data[0] = self._date.strftime( | |||
'\n=== %#m月%#d日 ===\n') + self._txt_data[0] | |||
logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを')) | |||
else: # Mac or Linux | |||
self._tables[0] = self._date.strftime( | |||
'\n=== %-m月%-d日 ===\n') + self._tables[0] | |||
logger.info(self._date.strftime('%-m月%-d日のツイートを取得完了ですを')) | |||
if date is not None: | |||
self._tables.insert(0, '') | |||
self._date = date | |||
def _convert_to_text_table(self, text: str) -> str: | |||
"""``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。 | |||
Args: | |||
text str: ヘッダとフッタがないWikiテーブル。 | |||
Returns: | |||
str: テーブル表示用のヘッダとフッタがついたWikiテーブル。 | |||
""" | |||
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}' | |||
@staticmethod | |||
def escape_wiki_reserved_words(text: str) -> str: | |||
"""MediaWikiの文法と衝突する文字を無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。 | |||
""" | |||
def escape_nolink_urls(text: str) -> str: | |||
"""Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。 | |||
""" | |||
is_in_archive_template: bool = False | |||
i: int = 0 | |||
while i < len(text): | |||
if is_in_archive_template: | |||
if text[i:i + 2] == '}}': | |||
is_in_archive_template = False | |||
i += 2 | |||
else: | |||
if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|': | |||
is_in_archive_template = True | |||
i += 10 | |||
elif text[i:i + 8] == 'https://': | |||
text = text[:i] + \ | |||
'<nowiki>https://</nowiki>' + text[i + 8:] | |||
i += 25 | |||
elif text[i:i + 7] == 'http://': | |||
text = text[:i] + \ | |||
'<nowiki>http://</nowiki>' + text[i + 7:] | |||
i += 24 | |||
i += 1 | |||
return text | |||
text = text.replace('\n', '<br>\n') | |||
text = re.sub(r'^ ', ' ', text, flags=re.MULTILINE) | |||
text = re.sub( | |||
r'^([\*#:;])', | |||
r'<nowiki>\1</nowiki>', | |||
text, | |||
flags=re.MULTILINE) | |||
text = re.sub( | |||
r'^----', | |||
'<nowiki>----</nowiki>', | |||
text, | |||
flags=re.MULTILINE) | |||
text = escape_nolink_urls(text) | |||
return text | |||
@staticmethod | |||
def archive_url( | |||
url: str, | |||
archived_url: str, | |||
text: str | None = None) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | |||
Args: | |||
url str: ラップするURL。 | |||
archive_url str: ラップするURLの魚拓のURL。 | |||
text str | None: ArchiveテンプレートでURLの代わりに表示する文字列。 | |||
Returns: | |||
str: ArchiveタグでラップしたURL。 | |||
""" | |||
if text is None: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}' | |||
else: | |||
return '{{Archive|1=' + \ | |||
unquote(url) + '|2=' + archived_url + '|3=' + text + '}}' | |||
@staticmethod | |||
def callinshowlink_url(url: str, archived_url: str) -> str: | |||
"""URLをCallinShowLinkテンプレートでラップする。 | |||
Args: | |||
url str: ラップするURL。 | |||
archive_url str: ラップするURLの魚拓のURL。 | |||
Returns: | |||
str: CallinShowLinkタグでラップしたURL。 | |||
""" | |||
return '{{CallinShowLink|1=' + url + '|2=' + archived_url + '}}' | |||
563行目: | 773行目: | ||
""" | """ | ||
LIMIT_N_TWEETS: Final[int] = | LIMIT_N_TWEETS: Final[int] = 100 | ||
"""Final[int]: 取得するツイート数の上限。 | """Final[int]: 取得するツイート数の上限。 | ||
""" | """ | ||
573行目: | 783行目: | ||
TWEETS_OR_REPLIES: Final[str] = 'with_replies' | TWEETS_OR_REPLIES: Final[str] = 'with_replies' | ||
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | ||
""" | """ | ||
603行目: | 810行目: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
self._check_slash() | self._check_slash() # スラッシュが抜けてないかチェック | ||
def _set_queries(self, accessor: AccessorHandler, krsw: bool): | def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> None: | ||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
618行目: | 823行目: | ||
""" | """ | ||
# ユーザー名取得 | |||
if krsw: | if krsw: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | |||
self._name: str = self.CALLINSHOW if krsw else self._get_name(accessor) | |||
# 検索クエリとページ取得 | |||
self.query_strs: list[str] = [] | self.query_strs: list[str] = [] | ||
if krsw: | if krsw: | ||
logger.info('クエリは自動的になしにナリます') | |||
else: | else: | ||
self._get_query() | self._get_query() | ||
page_optional: str | None = accessor.request( | |||
if | urljoin(self.NITTER_INSTANCE, self._name + '/' | ||
+ self.TWEETS_OR_REPLIES)) | |||
if page_optional is None: | |||
self._fail() | self._fail() | ||
self._page: str = page_optional | |||
# 終わりにするツイート取得 | |||
if krsw: | if krsw: | ||
logger.info('終わりにするツイートは自動的になしにナリます') | |||
self._stop: str = '' if krsw else self._stop_word() | |||
self. | # 日付取得 | ||
timeline_item: Tag | NavigableString | None = BeautifulSoup( | |||
self._page, 'html.parser').find( | |||
class_='timeline-item') | |||
assert isinstance(timeline_item, Tag) | |||
date: datetime = self._tweet_date(timeline_item) | |||
self._table_builder: TableBuilder = TableBuilder(date) | |||
self. | |||
def _check_slash(self) -> None | NoReturn: | def _check_slash(self) -> None | NoReturn: | ||
655行目: | 863行目: | ||
Raises: | Raises: | ||
RuntimeError: URLの最後にスラッシュがついていない場合に出る。 | RuntimeError: URLの最後にスラッシュがついていない場合に出る。 | ||
""" | """ | ||
if self.NITTER_INSTANCE[-1] != '/': | if self.NITTER_INSTANCE[-1] != '/': | ||
668行目: | 873行目: | ||
raise RuntimeError('TWITTER_URLの末尾には/が必須です') | raise RuntimeError('TWITTER_URLの末尾には/が必須です') | ||
def _check_nitter_instance(self, accessor: AccessorHandler) -> None | NoReturn: | def _check_nitter_instance( | ||
self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""Nitterのインスタンスが生きているかチェックする。 | """Nitterのインスタンスが生きているかチェックする。 | ||
死んでいたらそこで終了。 | 死んでいたらそこで終了。 | ||
接続を一回しか試さない :func:`~_request_once` | 接続を一回しか試さない :func:`~_request_once` を使っているのは、 | ||
激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。 | |||
Args: | Args: | ||
680行目: | 887行目: | ||
None | NoReturn: NitterにアクセスできればNone。できなければ終了。 | None | NoReturn: NitterにアクセスできればNone。できなければ終了。 | ||
""" | """ | ||
logger.info('Nitterのインスタンスチェック中ですを') | |||
try: | try: | ||
accessor.request_once(self.NITTER_INSTANCE) | accessor.request_once(self.NITTER_INSTANCE) | ||
except AccessError as e: | except AccessError as e: # エラー発生時は終了 | ||
logger.critical(e) | |||
logger.critical('インスタンスが死んでますを') | |||
exit(1) | exit(1) | ||
logger.info('Nitter OK') | |||
def _check_archive_instance(self, accessor: AccessorHandler) -> None | NoReturn: | def _check_archive_instance( | ||
self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""archive.todayのTor用インスタンスが生きているかチェックする。 | """archive.todayのTor用インスタンスが生きているかチェックする。 | ||
698行目: | 906行目: | ||
None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。 | None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。 | ||
""" | """ | ||
logger.info('archive.todayのTorインスタンスチェック中ですを') | |||
try: | try: | ||
accessor.request_once(self.ARCHIVE_TODAY) | accessor.request_once(self.ARCHIVE_TODAY) | ||
except AccessError as e: | except AccessError as e: # エラー発生時は終了 | ||
logger.critical(e) | |||
logger.critical('インスタンスが死んでますを') | |||
exit(1) | exit(1) | ||
logger.info('archive.today OK') | |||
def _invidious_instances(self, accessor: AccessorHandler) -> tuple[str] | NoReturn: | def _invidious_instances( | ||
self, | |||
accessor: AccessorHandler) -> tuple[str, ...] | NoReturn: | |||
"""Invidiousのインスタンスのタプルを取得する。 | """Invidiousのインスタンスのタプルを取得する。 | ||
714行目: | 924行目: | ||
Returns: | Returns: | ||
tuple[str] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。 | tuple[str, ...] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。 | ||
""" | """ | ||
logger.info('Invidiousのインスタンスリストを取得中ですを') | |||
invidious_json: Final[str] | None = accessor.request_with_requests_module('https://api.invidious.io/instances.json') | invidious_json: Final[str] | None = accessor.request_with_requests_module( | ||
'https://api.invidious.io/instances.json') | |||
if invidious_json is None: | if invidious_json is None: | ||
logger.critical('Invidiousが死んでますを') | |||
exit(1) | exit(1) | ||
instance_list: list[str] = [] | instance_list: list[str] = [] | ||
744行目: | 955行目: | ||
""" | """ | ||
while True: | while True: | ||
logger.info( | |||
account_str: str = input() | 'アカウント名を入れなければない。空白だと自動的に' | ||
+ self.CALLINSHOW | |||
+ 'になりますを') | |||
account_str: str = input() | |||
# 空欄で降臨ショー | |||
if account_str == '': | if account_str == '': | ||
return self.CALLINSHOW | return self.CALLINSHOW | ||
else: | else: | ||
res: Final[str | res: Final[str | None] = accessor.request( | ||
if res is None: | urljoin(self.NITTER_INSTANCE, account_str)) | ||
if res is None: # リクエスト失敗判定 | |||
self._fail() | self._fail() | ||
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | ||
if soup.title == self.NITTER_ERROR_TITLE: | if soup.title == self.NITTER_ERROR_TITLE: | ||
logger.warning(account_str + 'は実在の人物ではありませんでした') | |||
else: | else: | ||
logger.info('最終的に出会ったのが@' + account_str + 'だった。') | |||
return account_str | return account_str | ||
def _get_query(self) -> None: | def _get_query(self) -> None: | ||
765行目: | 980行目: | ||
取得したクエリは ``self.query_strs`` に加えられる。 | 取得したクエリは ``self.query_strs`` に加えられる。 | ||
""" | """ | ||
logger.info('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。') | |||
logger.info('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行') | |||
query_input: str = input() | query_input: str = input() | ||
# 空欄が押されるまでユーザー入力受付 | |||
while query_input != '': | while query_input != '': | ||
self.query_strs.append(query_input) | self.query_strs.append(query_input) | ||
query_input = input() | query_input = input() | ||
logger.info('クエリのピースが埋まっていく。') | |||
def _fail(self) -> NoReturn: | def _fail(self) -> NoReturn: | ||
779行目: | 994行目: | ||
取得に成功した分だけファイルにダンプし、プログラムを終了する。 | 取得に成功した分だけファイルにダンプし、プログラムを終了する。 | ||
""" | """ | ||
logger.critical('接続失敗しすぎで強制終了ナリ') | |||
if | if self._table_builder.count > 0: # 取得成功したデータがあれば発行 | ||
logger.critical('取得成功した分だけ発行しますを') | |||
self._table_builder.dump_file() | |||
exit(1) | |||
exit( | |||
def _stop_word(self) -> str: | def _stop_word(self) -> str: | ||
814行目: | 1,006行目: | ||
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | ||
""" | """ | ||
logger.info( | |||
end_str: Final[str] = input() | f'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。') | ||
end_str: Final[str] = input() | |||
return end_str | return end_str | ||
def _download_media(self, media_name: | def _download_media( | ||
self, | |||
media_name: str, | |||
accessor: AccessorHandler) -> bool: | |||
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
Args: | Args: | ||
media_name | media_name str: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
832行目: | 1,028行目: | ||
image_bytes: Final[bytes | None] = accessor.request_image(url) | image_bytes: Final[bytes | None] = accessor.request_image(url) | ||
if image_bytes is not None: | if image_bytes is not None: | ||
with open(os.path.join(self.MEDIA_DIR, media_name), | with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f: | ||
f.write(image_bytes) | f.write(image_bytes) | ||
return True | return True | ||
838行目: | 1,034行目: | ||
return False | return False | ||
def _tweet_date(self, tweet: | def _tweet_date(self, tweet: Tag) -> datetime: | ||
"""ツイートの時刻を取得する。 | """ツイートの時刻を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
Returns: | Returns: | ||
datetime: ツイートの時刻。 | datetime: ツイートの時刻。 | ||
""" | """ | ||
tweet_date: Tag | NavigableString | None = tweet.find( | |||
date: datetime = datetime.strptime(date_str, '%b %d, %Y · %I:%M %p %Z').replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('Asia/Tokyo')) | class_='tweet-date') | ||
assert isinstance(tweet_date, Tag) | |||
tweet_date_a: Tag | None = tweet_date.a | |||
assert tweet_date_a is not None | |||
date_str: str | list[str] | None = tweet_date_a.get('title') | |||
assert isinstance(date_str, str) | |||
date: datetime = datetime.strptime( | |||
date_str, | |||
'%b %d, %Y · %I:%M %p %Z').replace( | |||
tzinfo=ZoneInfo('UTC')).astimezone( | |||
ZoneInfo('Asia/Tokyo')) | |||
return date | return date | ||
def _get_tweet_media( | |||
def | self, | ||
tweet: Tag, | |||
accessor: AccessorHandler) -> str: | |||
self | |||
"""ツイートの画像や動画を取得する。 | """ツイートの画像や動画を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
881行目: | 1,070行目: | ||
str: Wiki記法でのファイルへのリンクの文字列。 | str: Wiki記法でのファイルへのリンクの文字列。 | ||
""" | """ | ||
tweet_media: | tweet_media: Tag | None = tweet.select_one( | ||
'.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択 | |||
media_txt: str = '' | media_txt: str = '' | ||
if tweet_media is not None: | if tweet_media is not None: | ||
888行目: | 1,078行目: | ||
for image_a in tweet_media.select('.attachment.image a'): | for image_a in tweet_media.select('.attachment.image a'): | ||
try: | try: | ||
media_name: str = [group for group in re.search(r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', image_a.get('href')).groups() if group][0] | media_name: str = [ | ||
media_list.append(f | group for group in re.search( | ||
r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', | |||
image_a.get('href')).groups() if group][0] | |||
media_list.append(f'[[ファイル:{media_name}|240px]]') | |||
if self._download_media(media_name, accessor): | if self._download_media(media_name, accessor): | ||
logger.info( | |||
os.path.join( | |||
self.MEDIA_DIR, | |||
media_name) | |||
+ ' をアップロードしなければない。') | |||
else: | else: | ||
logger.info( | |||
urljoin( | |||
'https://pbs.twimg.com/media/', | |||
media_name) | |||
+ ' をアップロードしなければない。') | |||
except AttributeError: | except AttributeError: | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | |||
media_list.append( | re.sub( | ||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) | |||
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | |||
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | |||
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | ||
for i, video_container in enumerate(tweet_media.select('.attachment.video-container')): | for i, video_container in enumerate( | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_media.select('.attachment.video-container')): | ||
tweet_url: str = urljoin( | |||
self.TWITTER_URL, | |||
re.sub( | |||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) # ツイートのURL作成 | |||
video = video_container.select_one('video') | video = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | |||
media_list.append( | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
continue | continue | ||
if subprocess.run(['which', 'ffmpeg'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0: | if subprocess.run(['which', | ||
'ffmpeg'], | |||
media_list.append( | stdout=subprocess.DEVNULL, | ||
else: # ffmpegがある場合 | stderr=subprocess.DEVNULL).returncode != 0: | ||
media_url: str = unquote(re.search(r'[^\/]+$', video.get('data-url')).group(0)) | logger.error(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを') | ||
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | |||
else: # ffmpegがある場合 | |||
# TODO: ブロックが大きすぎるので別メソッドに切り出す | |||
media_url: str = unquote( | |||
re.search( | |||
r'[^\/]+$', | |||
video.get('data-url')).group(0)) | |||
tweet_id: str = tweet_url.split('/')[-1] | tweet_id: str = tweet_url.split('/')[-1] | ||
# 動画のダウンロード | # 動画のダウンロード | ||
if accessor.proxies is not None: | if accessor.proxies is not None: | ||
returncode: int = subprocess.run([ | returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-http_proxy', | |||
'accessor.proxies["http"]', '-i', | |||
urljoin(self.NITTER_INSTANCE, media_url), | |||
'-c', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
else: | else: | ||
returncode: int = subprocess.run([ | returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-i', | |||
urljoin(self.NITTER_INSTANCE, media_url), | |||
'-c', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | # 取得成功したらtsをmp4に変換 | ||
if returncode == 0: | if returncode == 0: | ||
ts2mp4_returncode: int = subprocess.run([ | ts2mp4_returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-i', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts', | |||
'-acodec', 'copy', '-vcodec', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
if ts2mp4_returncode == 0: | if ts2mp4_returncode == 0: | ||
logger.info( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。') | |||
else: | else: | ||
logger.info( | |||
media_list.append(f | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。') | ||
media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]') | |||
else: | else: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | |||
media_list.append( | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
media_txt = ' '.join(media_list) | media_txt = ' '.join(media_list) | ||
return media_txt | return media_txt | ||
def _get_tweet_quote(self, tweet: | def _get_tweet_quote( | ||
self, | |||
tweet: Tag, | |||
accessor: AccessorHandler) -> str: | |||
"""引用リツイートの引用元へのリンクを取得する。 | """引用リツイートの引用元へのリンクを取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
942行目: | 1,190行目: | ||
str: Archiveテンプレートでラップされた引用元ツイートへのリンク。 | str: Archiveテンプレートでラップされた引用元ツイートへのリンク。 | ||
""" | """ | ||
tweet_quote: Final[ | tweet_quote: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .quote.quote-big') # 引用リツイートを選択 | |||
quote_txt: str = '' | quote_txt: str = '' | ||
if tweet_quote is not None: | if tweet_quote is not None: | ||
949行目: | 1,198行目: | ||
link = urljoin(self.TWITTER_URL, link) | link = urljoin(self.TWITTER_URL, link) | ||
quote_txt = self._archive_url(link, accessor) | quote_txt = self._archive_url(link, accessor) | ||
tweet_quote_unavailable: Final[ | tweet_quote_unavailable: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .quote.unavailable') # 引用リツイートを選択 | |||
if tweet_quote_unavailable is not None: | if tweet_quote_unavailable is not None: | ||
quote_txt = '(引用元が削除されました)' | quote_txt = '(引用元が削除されました)' | ||
return quote_txt | return quote_txt | ||
def _get_tweet_poll(self, tweet: | def _get_tweet_poll(self, tweet: Tag) -> str: | ||
"""ツイートの投票結果を取得する。 | """ツイートの投票結果を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
Returns: | Returns: | ||
str: Wiki形式に書き直した投票結果。 | str: Wiki形式に書き直した投票結果。 | ||
""" | """ | ||
tweet_poll: Final[ | tweet_poll: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .poll') | |||
poll_txt: str = '' | poll_txt: str = '' | ||
if tweet_poll is not None: | if tweet_poll is not None: | ||
poll_meters: Final[ | poll_meters: Final[ResultSet[Tag]] = tweet_poll.select( | ||
'.poll-meter') | |||
for poll_meter in poll_meters: | for poll_meter in poll_meters: | ||
ratio: str = poll_meter.select_one('.poll-choice-value').text | ratio: str = poll_meter.select_one('.poll-choice-value').text | ||
if 'leader' in poll_meter['class']: | if 'leader' in poll_meter['class']: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one( | ||
'.poll-choice-option').text + '</span>' | |||
else: | else: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one( | ||
poll_txt += '<br>\n <span style="font-size: small;">' + tweet_poll.select_one('.poll-info').text + '</span>' | '.poll-choice-option').text + '</span>' | ||
poll_txt += '<br>\n <span style="font-size: small;">' + \ | |||
tweet_poll.select_one('.poll-info').text + '</span>' | |||
return poll_txt | return poll_txt | ||
def _get_timeline_items(self, soup: BeautifulSoup) -> list[ | def _get_timeline_items(self, | ||
soup: BeautifulSoup) -> list[Tag]: | |||
"""タイムラインのツイートを取得。 | """タイムラインのツイートを取得。 | ||
985行目: | 1,241行目: | ||
Returns: | Returns: | ||
list[ | list[Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すTagオブジェクトのリスト。 | ||
""" | """ | ||
timeline_item_list: list[ | timeline_item_list: list[Tag] = [] | ||
for item_or_list in soup.select('.timeline > .timeline-item, .timeline > .thread-line'): | for item_or_list in soup.select( | ||
'.timeline > .timeline-item, .timeline > .thread-line'): | |||
if 'unavailable' in item_or_list.attrs['class']: | if 'unavailable' in item_or_list.attrs['class']: | ||
continue | continue | ||
1,007行目: | 1,264行目: | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup( | ||
tweets: Final[list[ | self._page, 'html.parser') # beautifulsoupでレスポンス解析 | ||
for tweet in tweets: | tweets: Final[list[Tag]] = self._get_timeline_items( | ||
if tweet.a.text == self.NEWEST: | soup) # 一ツイートのブロックごとにリストで取得 | ||
for tweet in tweets: # 一ツイート毎に処理 | |||
if tweet.a.text == self.NEWEST: | |||
# Load Newestのボタンは処理しない | |||
continue | continue | ||
if tweet.find(class_='retweet-header') is not None: | if tweet.find(class_='retweet-header') is not None: | ||
# retweet-headerはリツイートを示すので入っていれば処理しない | |||
continue | continue | ||
if tweet.find(class_='pinned') is not None: | if tweet.find(class_='pinned') is not None: | ||
# pinnedは固定ツイートを示すので入っていれば処理しない | |||
continue | continue | ||
if len(self.query_strs) > 0: # クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト | if len(self.query_strs) > 0: | ||
# クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト | |||
not_match: bool = False | not_match: bool = False | ||
for query_str in query_strs: | for query_str in self.query_strs: | ||
if query_str not in tweet.text: | if query_str not in tweet.text: | ||
not_match = True | not_match = True | ||
1,025行目: | 1,288行目: | ||
continue | continue | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | |||
re.sub( | |||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) | |||
date: datetime = self._tweet_date(tweet) | date: datetime = self._tweet_date(tweet) | ||
self._table_builder.next_day_if_necessary(date) | |||
archived_tweet_url: str = self._callinshowlink_url( | |||
archived_tweet_url: str = self._callinshowlink_url(tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: | tweet_content: Tag = tweet.find( | ||
self._archive_soup(tweet_content, accessor) | class_='tweet-content media-body') | ||
media_txt: str = self._get_tweet_media(tweet, accessor) | self._archive_soup(tweet_content, accessor) | ||
quote_txt: str = self._get_tweet_quote(tweet, accessor) | media_txt: str = self._get_tweet_media(tweet, accessor) | ||
poll_txt: str = self._get_tweet_poll(tweet) | quote_txt: str = self._get_tweet_quote(tweet, accessor) | ||
self. | poll_txt: str = self._get_tweet_poll(tweet) | ||
self._table_builder.append( | |||
archived_tweet_url, '<br>\n'.join( | |||
filter( | |||
None, [ | |||
self._table_builder.escape_wiki_reserved_words( | |||
tweet_content.get_text()), quote_txt, media_txt, poll_txt]))) | |||
if self._table_builder.count % self.REPORT_INTERVAL == 0: | |||
logger.info( | |||
f'ツイートを{self._table_builder.count}件も記録したンゴwwwwwwwwwww') | |||
if self._stop != '' and self._stop in tweet_content.get_text(): # 目的ツイートか判定 | |||
logger.info('目的ツイート発見でもう尾張屋根') | |||
self._table_builder.dump_file() | |||
if self._table_builder.count >= self.LIMIT_N_TWEETS: | |||
logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。') | |||
self._table_builder.dump_file() | |||
def _archive_soup(self, tag: | def _archive_soup( | ||
self, | |||
tag: Tag, | |||
accessor: AccessorHandler) -> None: | |||
"""ツイート内のaタグをテンプレートArchiveの文字列に変化させる。 | """ツイート内のaタグをテンプレートArchiveの文字列に変化させる。 | ||
1,105行目: | 1,331行目: | ||
Args: | Args: | ||
tag | tag Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
urls_in_tweet: Final[ | urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all( | ||
'a') | |||
for url in urls_in_tweet: | for url in urls_in_tweet: | ||
href: str | list[str] | None = url.get('href') | |||
if | assert isinstance(href, str) | ||
#Nitter上のTwitterへのリンクを直す | |||
url_link: str = | if href.startswith('https://') or href.startswith('http://'): | ||
url_link = re.sub('\?.*$', '', url_link) | # 先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない | ||
url.replace_with(self._archive_url(url_link, accessor)) | if href.startswith('https' + self.NITTER_INSTANCE[4:]): | ||
elif | # Nitter上のTwitterへのリンクを直す | ||
#Nitter上のTwitterへのリンクを直す | url_link: str = href.replace( | ||
url_link: str = | 'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | ||
url_link = re.sub('\?.*$', '', url_link) | url_link = re.sub('\\?.*$', '', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif self._invidious_pattern.search( | elif href.startswith('https://nitter.kavin.rocks/'): | ||
#Nitter上のYouTubeへのリンクをInvidiousのものから直す | # Nitter上のTwitterへのリンクを直す | ||
url_link: str = href.replace( | |||
if re.match('https://[^/]+/[^/]+/', | 'https://nitter.kavin.rocks/', self.TWITTER_URL) | ||
url_link = re.sub('\\?.*$', '', url_link) | |||
url.replace_with(self._archive_url(url_link, accessor)) | |||
elif self._invidious_pattern.search(href): | |||
# Nitter上のYouTubeへのリンクをInvidiousのものから直す | |||
if re.match( | |||
'https://[^/]+/[^/]+/', | |||
href) or re.search( | |||
'/@[^/]*$', | |||
href): | |||
href = self._invidious_pattern.sub('youtube.com', href) | |||
else: | else: | ||
href = self._invidious_pattern.sub('youtu.be', href) | |||
url.replace_with(self._archive_url( | url.replace_with(self._archive_url(href, accessor)) | ||
elif | elif href.startswith('https://bibliogram.art/'): | ||
# Nitter上のInstagramへのリンクをBibliogramのものから直す | # Nitter上のInstagramへのリンクをBibliogramのものから直す | ||
# Bibliogramは中止されたようなのでそのうちリンクが変わるかも | # Bibliogramは中止されたようなのでそのうちリンクが変わるかも | ||
url_link: str = | url_link: str = href.replace( | ||
url.replace_with(self._archive_url(url_link, accessor)) | 'https://bibliogram.art/', | ||
'https://www.instagram.com/') | |||
url.replace_with(self._archive_url(url_link, accessor)) | |||
else: | else: | ||
url.replace_with(self._archive_url( | url.replace_with(self._archive_url(href, accessor)) | ||
elif url.text.startswith('@'): | elif url.text.startswith('@'): | ||
url_link: str = urljoin(self.TWITTER_URL, | url_link: str = urljoin(self.TWITTER_URL, href) | ||
url_text: str = url.text | url_text: str = url.text | ||
url.replace_with(self._archive_url(url_link, accessor, url_text)) | url.replace_with( | ||
self._archive_url( | |||
url_link, accessor, url_text)) | |||
def _archive_url(self, url: | def _archive_url( | ||
self, | |||
url: str, | |||
accessor: AccessorHandler, | |||
text: str | None = None) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
1,147行目: | 1,391行目: | ||
Args: | Args: | ||
url | url str: ラップするURL。 | ||
accessor AccessorHandler: | accessor AccessorHandler: アクセスハンドラ。 | ||
text | text str|None: ArchiveテンプレートでURLの代わりに表示する文字列。 | ||
Returns: | Returns: | ||
str: ArchiveタグでラップしたURL。 | str: ArchiveタグでラップしたURL。 | ||
""" | """ | ||
if '#' in url: # フラグメント識別子の処理 | if '#' in url: # フラグメント識別子の処理 | ||
main_url, fragment = url.split('#', maxsplit=1) | main_url, fragment = url.split('#', maxsplit=1) | ||
return self._table_builder.archive_url( | |||
url, self._archive(main_url, accessor) + '#' + fragment, text) | |||
else: | else: | ||
return self._table_builder.archive_url( | |||
url, self._archive(url, accessor), text) | |||
def _callinshowlink_url(self, url: | def _callinshowlink_url(self, url: str, accessor: AccessorHandler) -> str: | ||
"""URLをCallinShowLinkテンプレートでラップする。 | """URLをCallinShowLinkテンプレートでラップする。 | ||
Args: | Args: | ||
url | url str: ラップするURL。 | ||
accessor AccessorHandler: | accessor AccessorHandler: アクセスハンドラ。 | ||
Returns: | Returns: | ||
str: CallinShowLinkタグでラップしたURL。 | str: CallinShowLinkタグでラップしたURL。 | ||
""" | """ | ||
return | return self._table_builder.callinshowlink_url( | ||
url, self._archive(url, accessor)) | |||
def _archive(self, url: | def _archive(self, url: str, accessor: AccessorHandler) -> str: | ||
"""URLから対応するarchive.todayのURLを返す。 | """URLから対応するarchive.todayのURLを返す。 | ||
1,186行目: | 1,427行目: | ||
Args: | Args: | ||
url | url str: 魚拓を取得するURL。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
1,192行目: | 1,433行目: | ||
str: 魚拓のURL。 | str: 魚拓のURL。 | ||
""" | """ | ||
archive_url: str = urljoin(self.ARCHIVE_TODAY_STANDARD, quote(unquote(url), safe='&=+?%')) | archive_url: str = urljoin( | ||
res: Final[str | None] = accessor.request(urljoin(self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) | self.ARCHIVE_TODAY_STANDARD, | ||
if res is None: | quote( | ||
unquote(url), | |||
safe='&=+?%')) # wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される | |||
res: Final[str | None] = accessor.request(urljoin( | |||
self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) | |||
if res is None: # 魚拓接続失敗時処理 | |||
logger.error(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。') | |||
else: | else: | ||
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
content: | content: Tag = soup.find( | ||
if content is None or content.get_text()[:len(self.NO_ARCHIVE)] == self.NO_ARCHIVE: | id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得 | ||
if content is None or content.get_text()[:len( | |||
self.NO_ARCHIVE)] == self.NO_ARCHIVE: # 魚拓があるかないか判定 | |||
logger.warning(url + 'の魚拓がない。これはいけない。') | |||
else: | else: | ||
archive_url = content.find('a').get('href').replace(self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | archive_url = content.find('a').get('href').replace( | ||
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | |||
return archive_url | return archive_url | ||
1,213行目: | 1,462行目: | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | ||
show_mores: Final[ | show_mores: Final[ResultSet[Tag] | ||
new_url: str = '' # ここで定義しないと動かなくなった、FIXME? | ] = soup.find_all(class_='show-more') | ||
for show_more in show_mores: | new_url: str = '' # ここで定義しないと動かなくなった、FIXME? | ||
if show_more.text != self.NEWEST: | for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | ||
new_url = urljoin(self.NITTER_INSTANCE, self. | if show_more.text != self.NEWEST: # 前ページへのリンクではないか判定 | ||
res: Final[str | None] = accessor.request(new_url) | new_url = urljoin( | ||
self.NITTER_INSTANCE, | |||
self._name | |||
+ '/' | |||
+ self.TWEETS_OR_REPLIES | |||
+ show_more.a.get('href')) # 直下のaタグのhrefの中身取ってURL頭部分と合体 | |||
res: Final[str | None] = accessor.request(new_url) | |||
if res is None: | if res is None: | ||
self._fail() | self._fail() | ||
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
if new_page_soup.find(class_= | if new_page_soup.find( | ||
class_='timeline-end') is None: | |||
self._page = res | # ツイートの終端ではtimeline-endだけのページになるので判定 | ||
logger.info(new_url + 'に移動しますを') | |||
self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集 | |||
else: | else: | ||
logger.info('急に残りツイートが無くなったな終了するか') | |||
self. | self._table_builder.dump_file() | ||
def execute(self, krsw: bool=False, use_browser: bool=True, enable_javascript: bool=True) -> NoReturn: | def execute(self, krsw: bool = False, use_browser: bool = True, | ||
enable_javascript: bool = True) -> NoReturn: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
1,243行目: | 1,501行目: | ||
self._check_nitter_instance(accessor) | self._check_nitter_instance(accessor) | ||
self._check_archive_instance(accessor) | self._check_archive_instance(accessor) | ||
# Invidiousのインスタンスリストの正規表現パターンを取得 | |||
invidious_url_tuple: Final[tuple[str]] = self._invidious_instances(accessor) | invidious_url_tuple: Final[tuple[str, ...] | ||
self._invidious_pattern: | ] = self._invidious_instances(accessor) | ||
self._invidious_pattern: re.Pattern[str] = re.compile( | |||
'|'.join(invidious_url_tuple)) | |||
# 検索クエリの設定 | # 検索クエリの設定 | ||
1,257行目: | 1,517行目: | ||
if __name__ == '__main__': | if __name__ == '__main__': | ||
if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < | if sys.version_info.major < 3 or ( | ||
sys.version_info.major == 3 and sys.version_info.minor < 11): | |||
logger.critical('Pythonのバージョンを3.11以上に上げて下さい') | |||
exit(1) | exit(1) | ||
parser: ArgumentParser = ArgumentParser() | parser: ArgumentParser = ArgumentParser() | ||
parser.add_argument( | parser.add_argument( | ||
parser.add_argument( | '--krsw', | ||
parser.add_argument( | action='store_true', | ||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | |||
parser.add_argument( | |||
'-n', | |||
'--no_browser', | |||
action='store_true', | |||
help='指定すると、Tor Browserを利用しない。') | |||
parser.add_argument( | |||
'-d', | |||
'--disable_script', | |||
action='store_true', | |||
help='指定すると、Tor BrowserでJavaScriptを利用しない。') | |||
args: Namespace = parser.parse_args() | args: Namespace = parser.parse_args() | ||
twitter_archiver: TwitterArchiver = TwitterArchiver() | twitter_archiver: TwitterArchiver = TwitterArchiver() | ||
twitter_archiver.execute(args.krsw, not args. | twitter_archiver.execute( | ||
args.krsw, | |||
not args.no_browser, | |||
not args.disable_script) | |||
</syntaxhighlight> | </syntaxhighlight> | ||
2023年9月28日 (木) 02:25時点における版
とりあえず取り急ぎ。バグ報告は利用者・トーク:夜泣き
コード
#!/usr/bin/env python3
"""Twitter自動収集スクリプト
ver4.0.3 2023/9/28恒心
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
前開発者との出会いに感謝
Examples:
定数類は状況に応じて変えてください。
::
$ python3 (ファイル名)
オプションに ``--krsw`` とつけると自動モードになります。
::
$ python3 (ファイル名) --krsw
``--no_browser`` オプションでTor Browserを使用しないモードに、
``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
つまりユーザー入力が要りません。
Note:
* Pythonのバージョンは3.11以上
* 環境は玉葱前提です。
* Whonix-Workstation, MacOSで動作確認済
* MacOSの場合はbrewでtorコマンドを導入し、実行
* PySocks, bs4, seleniumはインストールしないと標準で入ってません
* requestsも環境によっては入っていないかもしれない
* $ pip install bs4 requests PySocks selenium
* pipも入っていなければ ``$ sudo apt install pip``
* `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます
* バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて <https://krsw-wiki.org/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_
"""
import codecs
import json
import logging
import os
import platform
import random
import re
import subprocess
import sys
import warnings
from argparse import ArgumentParser, Namespace
from collections.abc import Callable
from datetime import datetime
from logging import Logger, getLogger
from time import sleep
from types import MappingProxyType, TracebackType
from typing import Any, Final, NoReturn, Self
from urllib.parse import quote, unquote, urljoin
from zoneinfo import ZoneInfo
import requests
from bs4 import BeautifulSoup
from bs4.element import NavigableString, ResultSet, Tag
from selenium import webdriver
from selenium.common.exceptions import (NoSuchElementException,
WebDriverException)
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
logging.basicConfig(level=logging.INFO,
format='{asctime} [{levelname:.4}] : {message}', style='{')
logger: Logger = getLogger(__name__)
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化
warnings.simplefilter('ignore')
class AccessError(Exception):
"""RequestsとSeleniumで共通のアクセスエラー。
"""
pass
class ReCaptchaRequiredError(Exception):
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。
"""
pass
class AbstractAccessor:
"""HTTPリクエストでWebサイトに接続するための基底クラス。
"""
WAIT_TIME: Final[int] = 1
"""Final[int]: HTTPリクエスト成功失敗関わらず待機時間。
1秒待つだけで行儀がいいクローラーだそうなので既定では1秒。
しかし日本のポリホーモは1秒待っていても捕まえてくるので注意。
https://ja.wikipedia.org/wiki/?curid=2187212
"""
WAIT_RANGE: Final[int] = 5
"""Final[int]: ランダムな時間待機するときの待機時間の幅。
"""
REQUEST_TIMEOUT: Final[int] = 30
"""Final[int]: HTTPリクエストのタイムアウト秒数。
"""
def _random_sleep(self) -> None:
"""ランダムな秒数スリープする。
自動操縦だとWebサイトに見破られないため。
"""
sleep(random.randrange(self.WAIT_TIME,
self.WAIT_TIME + self.WAIT_RANGE))
class RequestsAccessor(AbstractAccessor):
"""requestsモジュールでWebサイトに接続するためのクラス。
"""
HEADERS: Final[dict[str, str]] = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'
}
"""Final[dict[str, str]]: HTTPリクエスト時のヘッダ。
"""
PROXIES_WITH_COMMAND: Final[dict[str, str]] = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
}
"""Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。
"""
PROXIES_WITH_BROWSER: Final[dict[str, str]] = {
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
}
"""Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。
"""
TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip'
"""Final[str]: Tor経由で通信しているかチェックするサイトのURL。
"""
def __init__(self):
"""コンストラクタ。
"""
self._proxies: dict[str, str] | None = None
self._proxies = self._choose_tor_proxies() # Torに必要なプロキシをセット
def _execute(self,
url: str,
proxies: dict[str,
str] | None) -> requests.models.Response:
"""引数のURLにrequestsモジュールでHTTP接続する。
Args:
url str: 接続するURL。
proxies dir[str, str]: 接続に利用するプロキシ。
デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。
Returns:
requests.models.Response: レスポンスのオブジェクト。
Raises:
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
AccessError: ステータスコードが200でない場合のエラー。
"""
sleep(self.WAIT_TIME) # DoS対策で待つ
try:
res: requests.models.Response = requests.get(
url,
timeout=self.REQUEST_TIMEOUT,
headers=self.HEADERS,
allow_redirects=False,
proxies=proxies if proxies is not None else self._proxies)
res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生
except requests.exceptions.ConnectionError:
raise
except requests.exceptions.RequestException as e:
# requestsモジュール固有の例外を共通の例外に変換
raise AccessError(str(e)) from e
return res
def get(self,
url: str,
proxies: dict[str,
str] | None = None) -> str:
"""引数のURLにrequestsモジュールでHTTP接続する。
Args:
url str: 接続するURL。
proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。
Returns:
str: レスポンスのHTML。
Raises:
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
AccessError: ステータスコードが200でない場合のエラー。
"""
try:
return self._execute(url, proxies).text
except (requests.exceptions.ConnectionError, AccessError):
raise
def get_image(self, url: str) -> bytes | None:
"""引数のURLから画像のバイナリ列を取得する。
Args:
url str: 接続するURL
Returns:
bytes | None: 画像のバイナリ。画像でなければNone。
Raises:
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。
AccessError: ステータスコードが200でない場合のエラー。
"""
try:
res: requests.models.Response = self._execute(url, self._proxies)
except (requests.exceptions.ConnectionError, AccessError):
raise
if 'image' in res.headers['content-type']:
return res.content
else:
return None
def _choose_tor_proxies(self) -> dict[str, str] | None | NoReturn:
"""Torを使うのに必要なプロキシ情報を返す。
プロキシなしで接続できればNone、
Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、
torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。
いずれでもアクセスできなければ異常終了する。
Returns:
dict[str, str] | None | NoReturn: プロキシ情報。
Raises:
RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。
"""
logger.info('Torのチェック中ですを')
# プロキシなしでTorにアクセスできるかどうか
res: str = self.get(self.TOR_CHECK_URL, proxies=None)
is_tor: bool = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor connection OK')
return None
# Tor BrowserのプロキシでTorにアクセスできるかどうか
try:
res = self.get(self.TOR_CHECK_URL,
proxies=self.PROXIES_WITH_BROWSER)
is_tor = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor connection OK')
return self.PROXIES_WITH_BROWSER
except requests.exceptions.ConnectionError:
pass
# torコマンドのプロキシでTorにアクセスできるかどうか
try:
res = self.get(self.TOR_CHECK_URL,
proxies=self.PROXIES_WITH_COMMAND)
is_tor = json.loads(res)['IsTor']
if is_tor:
logger.info('Tor proxy OK')
return self.PROXIES_WITH_COMMAND
else:
raise RuntimeError('サイトにTorのIPでアクセスできていないなりを')
except requests.exceptions.ConnectionError as e:
logger.critical(e)
logger.critical('通信がTorのSOCKS proxyを経由していないなりを')
exit(1)
@property
def proxies(self) -> dict[str, str] | None:
"""オブジェクトのプロキシ設定を返す。
Returns:
dict[str, str] | None: プロキシ設定。
"""
return self._proxies
class SeleniumAccessor(AbstractAccessor):
"""SeleniumでWebサイトに接続するためのクラス。
"""
TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({
'Windows': '',
'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox',
'Linux': '/usr/bin/torbrowser'
})
"""MappingProxyType[str, str]: OSごとのTor Browserのパス。
Todo:
WindowsとLinuxでのTor Browserの実行パスを追加する。
"""
WAIT_TIME_FOR_INIT: Final[int] = 15
"""Final[int]: 最初のTor接続時の待機時間。
"""
def __init__(self, enable_javascript: bool):
"""コンストラクタ。
Tor Browserを自動操縦するためのSeleniumドライバを初期化する。
Args:
enable_javascript bool: JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
"""
self._javascript_enabled: bool = enable_javascript
options: Options = Options()
options.binary_location = self.TOR_BROWSER_PATHS[platform.system()]
if enable_javascript:
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')
options.preferences.update({
'javascript.enabled': enable_javascript,
'intl.accept_languages': 'en-US, en',
'intl.locale.requested': 'US',
'font.language.group': 'x-western',
'dom.webdriver.enabled': False # 自動操縦と見破られないための設定
})
try:
self.driver: webdriver.Firefox = webdriver.Firefox(options=options)
sleep(1)
wait_init: WebDriverWait = WebDriverWait(self.driver,
self.WAIT_TIME_FOR_INIT)
wait_init.until(
ec.element_to_be_clickable((By.ID, 'connectButton'))
)
self.driver.find_element(By.ID, 'connectButton').click()
wait_init.until(ec.url_contains('about:blank')) # Torの接続が完了するまで待つ
self.wait: WebDriverWait = WebDriverWait(self.driver,
self.REQUEST_TIMEOUT)
except BaseException:
self.quit()
raise
def quit(self) -> None:
"""Seleniumドライバを終了する。
"""
if self.driver:
self.driver.quit()
def _check_recaptcha(self) -> None:
"""reCAPTCHAが表示されているかどうか判定して、入力を待機する。
Raises:
ReCaptchaRequiredError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。
Todo:
botバレしたときに自動で他のTorサーキットに接続し直す。
"""
try:
self.driver.find_element( # 要素がない時に例外を吐く
By.CSS_SELECTOR,
'script[src^="https://www.google.com/recaptcha/api.js"]')
if self._javascript_enabled:
logger.warning('reCAPTCHAを解いてね(笑)、それはできるよね。')
logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ')
WebDriverWait(
self.driver,
10000).until(
ec.staleness_of(
self.driver.find_element(
By.CSS_SELECTOR,
'script[src^="https://www.google.com/recaptcha/api.js"]')))
sleep(self.WAIT_TIME) # DoS対策で待つ
else:
raise ReCaptchaRequiredError(
'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: '
+ self.driver.current_url)
except NoSuchElementException:
# reCAPTCHAの要素がなければそのまま
pass
def get(self, url: str) -> str:
"""引数のURLにSeleniumでHTTP接続する。
Args:
url str: 接続するURL。
Returns:
str: レスポンスのHTML。
"""
self._random_sleep() # DoS対策で待つ
try:
self.driver.get(url)
self._check_recaptcha()
except WebDriverException as e:
# Selenium固有の例外を共通の例外に変換
raise AccessError(str(e)) from e
return self.driver.page_source
class AccessorHandler:
"""WebサイトからHTMLを取得するためのクラス。
RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。
"""
LIMIT_N_REQUESTS: Final[int] = 5
"""Final[int]: HTTPリクエスト失敗時の再試行回数。
"""
WAIT_TIME_FOR_ERROR: Final[int] = 4
"""Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。
"""
def __init__(self, use_browser: bool, enable_javascript: bool):
"""コンストラクタ。
Requestのみを利用するか、Seleniumも利用するか引数で選択して初期化する。
Args:
use_browser bool: TrueならSeleniumを利用する。FalseならRequestsのみでアクセスする。
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。
"""
self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(
enable_javascript) if use_browser else None
self.requests_accessor: RequestsAccessor = RequestsAccessor()
def __enter__(self) -> Self:
"""withブロックの開始時に実行する。
Returns:
Self: オブジェクト自身。
"""
return self
def __exit__(self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None) -> None:
"""withブロックの終了時に実行する。
Args:
exc_type (type[BaseException] | None): コンテキスト内で例外を吐いた場合の例外タイプ。
exc_value (BaseException | None): コンテキスト内で例外を吐いた場合の例外。
traceback (TracebackType | None): コンテキスト内で例外を吐いた場合のトレースバック。
"""
if self.selenium_accessor is not None:
self.selenium_accessor.quit()
def request_once(self, url: str) -> str:
"""引数のURLにHTTP接続する。
Args:
url str: 接続するURL。
Returns:
str: レスポンスのテキスト。
Raises:
AccessError: アクセスエラー。
Note:
失敗かどうかは呼出側で要判定。
"""
try:
if self.selenium_accessor is not None:
return self.selenium_accessor.get(url)
else:
return self.requests_accessor.get(url)
except AccessError:
raise
def _request_with_callable(self,
url: str,
request_callable: Callable[[str],
Any]) -> Any | None:
"""request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きるとNoneを返す。
Args:
url str: 接続するURL
request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。
Returns:
Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。
"""
for i in range(1, self.LIMIT_N_REQUESTS + 1):
try:
res: Any = request_callable(url)
except AccessError:
logger.warning(
url
+ 'への通信失敗ナリ '
+ f'{i}/{self.LIMIT_N_REQUESTS}回')
if i < self.LIMIT_N_REQUESTS:
sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ
else:
return res
return None
def request(self, url: str) -> str | None:
"""HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きるとNoneを返す。
Args:
url str: 接続するURL
Returns:
str | None: レスポンスのテキスト。接続失敗が何度も起きるとNoneを返す。
Note:
失敗かどうかは呼出側で要判定
"""
return self._request_with_callable(url, self.request_once)
def request_with_requests_module(self, url: str) -> str | None:
"""requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。
成功すると結果を返す。
接続失敗が何度も起きるとNoneを返す。
Args:
url str: 接続するURL
Returns:
str | None: レスポンスのテキスト。接続失敗が何度も起きるとNoneを返す。
Note:
失敗かどうかは呼出側で要判定
"""
return self._request_with_callable(url, self.requests_accessor.get)
def request_image(self, url: str) -> bytes | None:
"""requestsモジュールで画像ファイルを取得します。
成功すると結果を返します。
接続失敗が何度も起きるとNoneを返します。
Args:
url Final[str]: 接続するURL
Returns:
bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。
"""
return self._request_with_callable(url,
self.requests_accessor.get_image)
@property
def proxies(self) -> dict[str, str] | None:
"""RequestsAccessorオブジェクトのプロキシ設定を返す。
Returns:
dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。
"""
return self.requests_accessor.proxies
class TableBuilder:
def __init__(self, date: datetime):
self._tables: list[str] = ['']
self._count: int = 0 # 記録数
self._date: datetime = date
@property
def count(self) -> int:
return self._count
def append(self, archived_tweet_url: str, text: str) -> None:
self._tables[0] = '!' + archived_tweet_url + '\n|-\n|\n' \
+ text \
+ '\n|-\n' \
+ self._tables[0] # wikiの文法に変化
self._count += 1
def dump_file(self) -> NoReturn:
"""Wikiテーブルをファイル出力し、プログラムを終了する。
"""
self._next_day()
result_txt: Final[str] = '\n'.join(self._tables)
with codecs.open('tweet.txt', 'w', 'utf-8') as f:
f.write(result_txt)
logger.info('テキストファイル手に入ったやで〜')
exit(0)
def next_day_if_necessary(self, date: datetime | None = None) -> None:
if date is None:
return
if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day:
self._next_day(date)
def _next_day(self, date: datetime | None = None) -> None:
if self._tables[0]:
self._tables[0] = self._convert_to_text_table(self._tables[0])
if os.name == 'nt': # Windows
self._txt_data[0] = self._date.strftime(
'\n=== %#m月%#d日 ===\n') + self._txt_data[0]
logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
else: # Mac or Linux
self._tables[0] = self._date.strftime(
'\n=== %-m月%-d日 ===\n') + self._tables[0]
logger.info(self._date.strftime('%-m月%-d日のツイートを取得完了ですを'))
if date is not None:
self._tables.insert(0, '')
self._date = date
def _convert_to_text_table(self, text: str) -> str:
"""``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。
Args:
text str: ヘッダとフッタがないWikiテーブル。
Returns:
str: テーブル表示用のヘッダとフッタがついたWikiテーブル。
"""
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'
@staticmethod
def escape_wiki_reserved_words(text: str) -> str:
"""MediaWikiの文法と衝突する文字を無効化する。
Args:
text str: ツイートの文字列。
Returns:
str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。
"""
def escape_nolink_urls(text: str) -> str:
"""Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。
Args:
text str: ツイートの文字列。
Returns:
str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。
"""
is_in_archive_template: bool = False
i: int = 0
while i < len(text):
if is_in_archive_template:
if text[i:i + 2] == '}}':
is_in_archive_template = False
i += 2
else:
if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|':
is_in_archive_template = True
i += 10
elif text[i:i + 8] == 'https://':
text = text[:i] + \
'<nowiki>https://</nowiki>' + text[i + 8:]
i += 25
elif text[i:i + 7] == 'http://':
text = text[:i] + \
'<nowiki>http://</nowiki>' + text[i + 7:]
i += 24
i += 1
return text
text = text.replace('\n', '<br>\n')
text = re.sub(r'^ ', ' ', text, flags=re.MULTILINE)
text = re.sub(
r'^([\*#:;])',
r'<nowiki>\1</nowiki>',
text,
flags=re.MULTILINE)
text = re.sub(
r'^----',
'<nowiki>----</nowiki>',
text,
flags=re.MULTILINE)
text = escape_nolink_urls(text)
return text
@staticmethod
def archive_url(
url: str,
archived_url: str,
text: str | None = None) -> str:
"""URLをArchiveテンプレートでラップする。
Args:
url str: ラップするURL。
archive_url str: ラップするURLの魚拓のURL。
text str | None: ArchiveテンプレートでURLの代わりに表示する文字列。
Returns:
str: ArchiveタグでラップしたURL。
"""
if text is None:
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}'
else:
return '{{Archive|1=' + \
unquote(url) + '|2=' + archived_url + '|3=' + text + '}}'
@staticmethod
def callinshowlink_url(url: str, archived_url: str) -> str:
"""URLをCallinShowLinkテンプレートでラップする。
Args:
url str: ラップするURL。
archive_url str: ラップするURLの魚拓のURL。
Returns:
str: CallinShowLinkタグでラップしたURL。
"""
return '{{CallinShowLink|1=' + url + '|2=' + archived_url + '}}'
class TwitterArchiver:
"""ツイートをWikiの形式にダンプするクラス。
Nitterからツイートを取得し、Wikiの形式にダンプする。
削除されたツイートや編集前のツイートは取得できない。
"""
NITTER_INSTANCE: Final[str] = 'https://nitter.net/'
"""Final[str]: Nitterのインスタンス。
生きているのは https://github.com/zedeus/nitter/wiki/Instances で確認。
Note:
末尾にスラッシュ必須。
Todo:
Tor専用のインスタンスが使えるようになったら変更する。
"""
ARCHIVE_TODAY: Final[str] = 'http://archiveiya74codqgiixo33q62qlrqtkgmcitqx5u2oeqnmn5bpcbiyd.onion/'
"""Final[str]: archive.todayの魚拓のonionドメイン。
ページにアクセスして魚拓があるか調べるのにはonion版のarchive.todayを使用。
Note:
末尾にスラッシュ必須。
"""
ARCHIVE_TODAY_STANDARD: Final[str] = 'https://archive.vn/'
"""Final[str]: archive.todayの魚拓のクリアネットドメイン。
記事にはクリアネット用のarchive.todayリンクを貼る。
Note:
末尾にスラッシュ必須。
"""
TWITTER_URL: Final[str] = 'https://twitter.com/'
"""Final[str]: TwitterのURL。
Note:
末尾にスラッシュ必須。
"""
CALLINSHOW: Final[str] = 'CallinShow'
"""Final[str]: 降臨ショーのユーザーネーム。
"""
LIMIT_N_TWEETS: Final[int] = 100
"""Final[int]: 取得するツイート数の上限。
"""
REPORT_INTERVAL: Final[int] = 5
"""Final[int]: 記録件数を報告するインターバル。
"""
TWEETS_OR_REPLIES: Final[str] = 'with_replies'
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。
"""
NITTER_ERROR_TITLE: Final[str] = 'Error|nitter'
"""Final[str]: Nitterでユーザーがいなかったとき返ってくるページのタイトル。
万が一仕様変更で変わったとき用。
"""
NO_ARCHIVE: Final[str] = 'No results'
"""Final[str]: archive.todayで魚拓がなかったときのレスポンス。
万が一仕様変更で変わったとき用。
"""
NEWEST: Final[str] = 'Load newest'
"""Final[str]: Nitterの前ページ読み込み部分の名前。
万が一仕様変更で変わったとき用。
"""
MEDIA_DIR: Final[str] = 'tweet_media'
"""Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。
"""
def __init__(self):
"""コンストラクタ。
"""
self._check_slash() # スラッシュが抜けてないかチェック
def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> None:
"""検索条件を設定する。
:class:`~TwitterArchiver` のメンバをセットし、ツイートを収集するアカウントと
検索クエリ、終わりにするツイートを入力させる。
Args:
accessor AccessorHandler: アクセスハンドラ
krsw bool: Trueの場合、名前が自動で :const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。
"""
# ユーザー名取得
if krsw:
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます')
self._name: str = self.CALLINSHOW if krsw else self._get_name(accessor)
# 検索クエリとページ取得
self.query_strs: list[str] = []
if krsw:
logger.info('クエリは自動的になしにナリます')
else:
self._get_query()
page_optional: str | None = accessor.request(
urljoin(self.NITTER_INSTANCE, self._name + '/'
+ self.TWEETS_OR_REPLIES))
if page_optional is None:
self._fail()
self._page: str = page_optional
# 終わりにするツイート取得
if krsw:
logger.info('終わりにするツイートは自動的になしにナリます')
self._stop: str = '' if krsw else self._stop_word()
# 日付取得
timeline_item: Tag | NavigableString | None = BeautifulSoup(
self._page, 'html.parser').find(
class_='timeline-item')
assert isinstance(timeline_item, Tag)
date: datetime = self._tweet_date(timeline_item)
self._table_builder: TableBuilder = TableBuilder(date)
def _check_slash(self) -> None | NoReturn:
"""URLの最後にスラッシュが付いていなければエラーを出します。
Returns:
None | NoReturn: すべてのURLが正しければNone。失敗したら例外を出す。
Raises:
RuntimeError: URLの最後にスラッシュがついていない場合に出る。
"""
if self.NITTER_INSTANCE[-1] != '/':
raise RuntimeError('NITTER_INSTANCEの末尾には/が必須です')
if self.ARCHIVE_TODAY[-1] != '/':
raise RuntimeError('ARCHIVE_TODAYの末尾には/が必須です')
if self.ARCHIVE_TODAY_STANDARD[-1] != '/':
raise RuntimeError('ARCHIVE_TODAY_STANDARDの末尾には/が必須です')
if self.TWITTER_URL[-1] != '/':
raise RuntimeError('TWITTER_URLの末尾には/が必須です')
def _check_nitter_instance(
self, accessor: AccessorHandler) -> None | NoReturn:
"""Nitterのインスタンスが生きているかチェックする。
死んでいたらそこで終了。
接続を一回しか試さない :func:`~_request_once` を使っているのは、
激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。
Args:
accessor AccessorHandler: アクセスハンドラ
Returns:
None | NoReturn: NitterにアクセスできればNone。できなければ終了。
"""
logger.info('Nitterのインスタンスチェック中ですを')
try:
accessor.request_once(self.NITTER_INSTANCE)
except AccessError as e: # エラー発生時は終了
logger.critical(e)
logger.critical('インスタンスが死んでますを')
exit(1)
logger.info('Nitter OK')
def _check_archive_instance(
self, accessor: AccessorHandler) -> None | NoReturn:
"""archive.todayのTor用インスタンスが生きているかチェックする。
Args:
accessor AccessorHandler: アクセスハンドラ
Returns:
None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。
"""
logger.info('archive.todayのTorインスタンスチェック中ですを')
try:
accessor.request_once(self.ARCHIVE_TODAY)
except AccessError as e: # エラー発生時は終了
logger.critical(e)
logger.critical('インスタンスが死んでますを')
exit(1)
logger.info('archive.today OK')
def _invidious_instances(
self,
accessor: AccessorHandler) -> tuple[str, ...] | NoReturn:
"""Invidiousのインスタンスのタプルを取得する。
Args:
accessor AccessorHandler: アクセスハンドラ
Returns:
tuple[str, ...] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。
"""
logger.info('Invidiousのインスタンスリストを取得中ですを')
invidious_json: Final[str] | None = accessor.request_with_requests_module(
'https://api.invidious.io/instances.json')
if invidious_json is None:
logger.critical('Invidiousが死んでますを')
exit(1)
instance_list: list[str] = []
for instance_info in json.loads(invidious_json):
instance_list.append(instance_info[0])
# よく使われているものはチェック
if 'piped.kavin.rocks' not in instance_list:
instance_list.append('piped.kavin.rocks')
if 'piped.video' not in instance_list:
instance_list.append('piped.video')
return tuple(instance_list)
def _get_name(self, accessor: AccessorHandler) -> str | NoReturn:
"""ツイート収集するユーザー名を標準入力から取得する。
何も入力しないと :const:`~CALLINSHOW` を指定する。
Args:
accessor AccessorHandler: アクセスハンドラ
Returns:
str | NoReturn: ユーザ名。ユーザページの取得に失敗したら終了。
"""
while True:
logger.info(
'アカウント名を入れなければない。空白だと自動的に'
+ self.CALLINSHOW
+ 'になりますを')
account_str: str = input()
# 空欄で降臨ショー
if account_str == '':
return self.CALLINSHOW
else:
res: Final[str | None] = accessor.request(
urljoin(self.NITTER_INSTANCE, account_str))
if res is None: # リクエスト失敗判定
self._fail()
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser')
if soup.title == self.NITTER_ERROR_TITLE:
logger.warning(account_str + 'は実在の人物ではありませんでした')
else:
logger.info('最終的に出会ったのが@' + account_str + 'だった。')
return account_str
def _get_query(self) -> None:
"""検索クエリを標準入力から取得する。
取得したクエリは ``self.query_strs`` に加えられる。
"""
logger.info('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。')
logger.info('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行')
query_input: str = input()
# 空欄が押されるまでユーザー入力受付
while query_input != '':
self.query_strs.append(query_input)
query_input = input()
logger.info('クエリのピースが埋まっていく。')
def _fail(self) -> NoReturn:
"""接続失敗時処理。
取得に成功した分だけファイルにダンプし、プログラムを終了する。
"""
logger.critical('接続失敗しすぎで強制終了ナリ')
if self._table_builder.count > 0: # 取得成功したデータがあれば発行
logger.critical('取得成功した分だけ発行しますを')
self._table_builder.dump_file()
exit(1)
def _stop_word(self) -> str:
"""ツイートの記録を中断するための文をユーザに入力させる。
Returns:
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。
"""
logger.info(
f'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。')
end_str: Final[str] = input()
return end_str
def _download_media(
self,
media_name: str,
accessor: AccessorHandler) -> bool:
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。
Args:
media_name str: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。
accessor AccessorHandler: アクセスハンドラ
Returns:
bool: 保存に成功したかどうか。
"""
os.makedirs(self.MEDIA_DIR, exist_ok=True)
url: Final[str] = urljoin('https://pbs.twimg.com/media/', media_name)
image_bytes: Final[bytes | None] = accessor.request_image(url)
if image_bytes is not None:
with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f:
f.write(image_bytes)
return True
else:
return False
def _tweet_date(self, tweet: Tag) -> datetime:
"""ツイートの時刻を取得する。
Args:
tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
Returns:
datetime: ツイートの時刻。
"""
tweet_date: Tag | NavigableString | None = tweet.find(
class_='tweet-date')
assert isinstance(tweet_date, Tag)
tweet_date_a: Tag | None = tweet_date.a
assert tweet_date_a is not None
date_str: str | list[str] | None = tweet_date_a.get('title')
assert isinstance(date_str, str)
date: datetime = datetime.strptime(
date_str,
'%b %d, %Y · %I:%M %p %Z').replace(
tzinfo=ZoneInfo('UTC')).astimezone(
ZoneInfo('Asia/Tokyo'))
return date
def _get_tweet_media(
self,
tweet: Tag,
accessor: AccessorHandler) -> str:
"""ツイートの画像や動画を取得する。
Args:
tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
accessor AccessorHandler: アクセスハンドラ
Returns:
str: Wiki記法でのファイルへのリンクの文字列。
"""
tweet_media: Tag | None = tweet.select_one(
'.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択
media_txt: str = ''
if tweet_media is not None:
media_list: list[str] = []
# ツイートの画像の取得
for image_a in tweet_media.select('.attachment.image a'):
try:
media_name: str = [
group for group in re.search(
r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)',
image_a.get('href')).groups() if group][0]
media_list.append(f'[[ファイル:{media_name}|240px]]')
if self._download_media(media_name, accessor):
logger.info(
os.path.join(
self.MEDIA_DIR,
media_name)
+ ' をアップロードしなければない。')
else:
logger.info(
urljoin(
'https://pbs.twimg.com/media/',
media_name)
+ ' をアップロードしなければない。')
except AttributeError:
tweet_url: str = urljoin(
self.TWITTER_URL,
re.sub(
'#[^#]*$',
'',
tweet.find(
class_='tweet-link').get('href')))
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること
for i, video_container in enumerate(
tweet_media.select('.attachment.video-container')):
tweet_url: str = urljoin(
self.TWITTER_URL,
re.sub(
'#[^#]*$',
'',
tweet.find(
class_='tweet-link').get('href'))) # ツイートのURL作成
video = video_container.select_one('video')
if video is None:
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
continue
if subprocess.run(['which',
'ffmpeg'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL).returncode != 0:
logger.error(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
else: # ffmpegがある場合
# TODO: ブロックが大きすぎるので別メソッドに切り出す
media_url: str = unquote(
re.search(
r'[^\/]+$',
video.get('data-url')).group(0))
tweet_id: str = tweet_url.split('/')[-1]
# 動画のダウンロード
if accessor.proxies is not None:
returncode: int = subprocess.run(
[
'ffmpeg', '-y', '-http_proxy',
'accessor.proxies["http"]', '-i',
urljoin(self.NITTER_INSTANCE, media_url),
'-c', 'copy',
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
],
stdout=subprocess.DEVNULL).returncode
else:
returncode: int = subprocess.run(
[
'ffmpeg', '-y', '-i',
urljoin(self.NITTER_INSTANCE, media_url),
'-c', 'copy',
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts'
],
stdout=subprocess.DEVNULL).returncode
# 取得成功したらtsをmp4に変換
if returncode == 0:
ts2mp4_returncode: int = subprocess.run(
[
'ffmpeg', '-y', '-i',
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts',
'-acodec', 'copy', '-vcodec', 'copy',
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4'
],
stdout=subprocess.DEVNULL).returncode
if ts2mp4_returncode == 0:
logger.info(
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。')
else:
logger.info(
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。')
media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]')
else:
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能')
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]')
media_txt = ' '.join(media_list)
return media_txt
def _get_tweet_quote(
self,
tweet: Tag,
accessor: AccessorHandler) -> str:
"""引用リツイートの引用元へのリンクを取得する。
Args:
tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
accessor AccessorHandler: アクセスハンドラ
Returns:
str: Archiveテンプレートでラップされた引用元ツイートへのリンク。
"""
tweet_quote: Final[Tag | None] = tweet.select_one(
'.tweet-body > .quote.quote-big') # 引用リツイートを選択
quote_txt: str = ''
if tweet_quote is not None:
link: str = tweet_quote.select_one('.quote-link').get('href')
link = re.sub('#.*$', '', link)
link = urljoin(self.TWITTER_URL, link)
quote_txt = self._archive_url(link, accessor)
tweet_quote_unavailable: Final[Tag | None] = tweet.select_one(
'.tweet-body > .quote.unavailable') # 引用リツイートを選択
if tweet_quote_unavailable is not None:
quote_txt = '(引用元が削除されました)'
return quote_txt
def _get_tweet_poll(self, tweet: Tag) -> str:
"""ツイートの投票結果を取得する。
Args:
tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。
Returns:
str: Wiki形式に書き直した投票結果。
"""
tweet_poll: Final[Tag | None] = tweet.select_one(
'.tweet-body > .poll')
poll_txt: str = ''
if tweet_poll is not None:
poll_meters: Final[ResultSet[Tag]] = tweet_poll.select(
'.poll-meter')
for poll_meter in poll_meters:
ratio: str = poll_meter.select_one('.poll-choice-value').text
if 'leader' in poll_meter['class']:
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one(
'.poll-choice-option').text + '</span>'
else:
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one(
'.poll-choice-option').text + '</span>'
poll_txt += '<br>\n <span style="font-size: small;">' + \
tweet_poll.select_one('.poll-info').text + '</span>'
return poll_txt
def _get_timeline_items(self,
soup: BeautifulSoup) -> list[Tag]:
"""タイムラインのツイートを取得。
基本的に投稿時刻の降順に取得し、リプライツリーは最後のツイートの時刻を基準として降順にひとまとまりにする。
Args:
soup BeautifulSoup: Nitterのページを表すBeautifulSoupオブジェクト。
Returns:
list[Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すTagオブジェクトのリスト。
"""
timeline_item_list: list[Tag] = []
for item_or_list in soup.select(
'.timeline > .timeline-item, .timeline > .thread-line'):
if 'unavailable' in item_or_list.attrs['class']:
continue
elif 'thread-line' in item_or_list.attrs['class']:
# そのままtimeline-itemクラスをfind_allするとツイートの順番が逆転するので、順番通りに取得するよう処理
for item in reversed(item_or_list.select('.timeline-item')):
timeline_item_list.append(item)
else:
timeline_item_list.append(item_or_list)
return timeline_item_list
def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn:
"""ページからツイート本文を ``self._txt_data`` に収めていく。
ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。
Args:
accessor AccessorHandler: アクセスハンドラ
"""
soup: Final[BeautifulSoup] = BeautifulSoup(
self._page, 'html.parser') # beautifulsoupでレスポンス解析
tweets: Final[list[Tag]] = self._get_timeline_items(
soup) # 一ツイートのブロックごとにリストで取得
for tweet in tweets: # 一ツイート毎に処理
if tweet.a.text == self.NEWEST:
# Load Newestのボタンは処理しない
continue
if tweet.find(class_='retweet-header') is not None:
# retweet-headerはリツイートを示すので入っていれば処理しない
continue
if tweet.find(class_='pinned') is not None:
# pinnedは固定ツイートを示すので入っていれば処理しない
continue
if len(self.query_strs) > 0:
# クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト
not_match: bool = False
for query_str in self.query_strs:
if query_str not in tweet.text:
not_match = True
break
if not_match:
continue
tweet_url: str = urljoin(
self.TWITTER_URL,
re.sub(
'#[^#]*$',
'',
tweet.find(
class_='tweet-link').get('href')))
date: datetime = self._tweet_date(tweet)
self._table_builder.next_day_if_necessary(date)
archived_tweet_url: str = self._callinshowlink_url(
tweet_url, accessor)
tweet_content: Tag = tweet.find(
class_='tweet-content media-body')
self._archive_soup(tweet_content, accessor)
media_txt: str = self._get_tweet_media(tweet, accessor)
quote_txt: str = self._get_tweet_quote(tweet, accessor)
poll_txt: str = self._get_tweet_poll(tweet)
self._table_builder.append(
archived_tweet_url, '<br>\n'.join(
filter(
None, [
self._table_builder.escape_wiki_reserved_words(
tweet_content.get_text()), quote_txt, media_txt, poll_txt])))
if self._table_builder.count % self.REPORT_INTERVAL == 0:
logger.info(
f'ツイートを{self._table_builder.count}件も記録したンゴwwwwwwwwwww')
if self._stop != '' and self._stop in tweet_content.get_text(): # 目的ツイートか判定
logger.info('目的ツイート発見でもう尾張屋根')
self._table_builder.dump_file()
if self._table_builder.count >= self.LIMIT_N_TWEETS:
logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。')
self._table_builder.dump_file()
def _archive_soup(
self,
tag: Tag,
accessor: AccessorHandler) -> None:
"""ツイート内のaタグをテンプレートArchiveの文字列に変化させる。
NitterリンクをYouTubeへのリンクに、bibliogramへのリンクをInstagramへのリンクに修正する。
Args:
tag Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。
accessor AccessorHandler: アクセスハンドラ
"""
urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all(
'a')
for url in urls_in_tweet:
href: str | list[str] | None = url.get('href')
assert isinstance(href, str)
if href.startswith('https://') or href.startswith('http://'):
# 先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない
if href.startswith('https' + self.NITTER_INSTANCE[4:]):
# Nitter上のTwitterへのリンクを直す
url_link: str = href.replace(
'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
url_link = re.sub('\\?.*$', '', url_link)
url.replace_with(self._archive_url(url_link, accessor))
elif href.startswith('https://nitter.kavin.rocks/'):
# Nitter上のTwitterへのリンクを直す
url_link: str = href.replace(
'https://nitter.kavin.rocks/', self.TWITTER_URL)
url_link = re.sub('\\?.*$', '', url_link)
url.replace_with(self._archive_url(url_link, accessor))
elif self._invidious_pattern.search(href):
# Nitter上のYouTubeへのリンクをInvidiousのものから直す
if re.match(
'https://[^/]+/[^/]+/',
href) or re.search(
'/@[^/]*$',
href):
href = self._invidious_pattern.sub('youtube.com', href)
else:
href = self._invidious_pattern.sub('youtu.be', href)
url.replace_with(self._archive_url(href, accessor))
elif href.startswith('https://bibliogram.art/'):
# Nitter上のInstagramへのリンクをBibliogramのものから直す
# Bibliogramは中止されたようなのでそのうちリンクが変わるかも
url_link: str = href.replace(
'https://bibliogram.art/',
'https://www.instagram.com/')
url.replace_with(self._archive_url(url_link, accessor))
else:
url.replace_with(self._archive_url(href, accessor))
elif url.text.startswith('@'):
url_link: str = urljoin(self.TWITTER_URL, href)
url_text: str = url.text
url.replace_with(
self._archive_url(
url_link, accessor, url_text))
def _archive_url(
self,
url: str,
accessor: AccessorHandler,
text: str | None = None) -> str:
"""URLをArchiveテンプレートでラップする。
フラグメント識別子がURLに含まれていたら、Archive側のURLにも反映させる。
Args:
url str: ラップするURL。
accessor AccessorHandler: アクセスハンドラ。
text str|None: ArchiveテンプレートでURLの代わりに表示する文字列。
Returns:
str: ArchiveタグでラップしたURL。
"""
if '#' in url: # フラグメント識別子の処理
main_url, fragment = url.split('#', maxsplit=1)
return self._table_builder.archive_url(
url, self._archive(main_url, accessor) + '#' + fragment, text)
else:
return self._table_builder.archive_url(
url, self._archive(url, accessor), text)
def _callinshowlink_url(self, url: str, accessor: AccessorHandler) -> str:
"""URLをCallinShowLinkテンプレートでラップする。
Args:
url str: ラップするURL。
accessor AccessorHandler: アクセスハンドラ。
Returns:
str: CallinShowLinkタグでラップしたURL。
"""
return self._table_builder.callinshowlink_url(
url, self._archive(url, accessor))
def _archive(self, url: str, accessor: AccessorHandler) -> str:
"""URLから対応するarchive.todayのURLを返す。
取得できれば魚拓ページのURLを返す。
魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。
アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。
Args:
url str: 魚拓を取得するURL。
accessor AccessorHandler: アクセスハンドラ
Returns:
str: 魚拓のURL。
"""
archive_url: str = urljoin(
self.ARCHIVE_TODAY_STANDARD,
quote(
unquote(url),
safe='&=+?%')) # wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される
res: Final[str | None] = accessor.request(urljoin(
self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%')))
if res is None: # 魚拓接続失敗時処理
logger.error(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。')
else:
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
content: Tag = soup.find(
id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得
if content is None or content.get_text()[:len(
self.NO_ARCHIVE)] == self.NO_ARCHIVE: # 魚拓があるかないか判定
logger.warning(url + 'の魚拓がない。これはいけない。')
else:
archive_url = content.find('a').get('href').replace(
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
return archive_url
def _go_to_new_page(self, accessor: AccessorHandler) -> None | NoReturn:
"""Nitterで次のページに移動する。
次のページが無ければプログラムを終了する。
Args:
accessor AccessorHandler: アクセスハンドラ
"""
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser')
show_mores: Final[ResultSet[Tag]
] = soup.find_all(class_='show-more')
new_url: str = '' # ここで定義しないと動かなくなった、FIXME?
for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある
if show_more.text != self.NEWEST: # 前ページへのリンクではないか判定
new_url = urljoin(
self.NITTER_INSTANCE,
self._name
+ '/'
+ self.TWEETS_OR_REPLIES
+ show_more.a.get('href')) # 直下のaタグのhrefの中身取ってURL頭部分と合体
res: Final[str | None] = accessor.request(new_url)
if res is None:
self._fail()
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
if new_page_soup.find(
class_='timeline-end') is None:
# ツイートの終端ではtimeline-endだけのページになるので判定
logger.info(new_url + 'に移動しますを')
self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集
else:
logger.info('急に残りツイートが無くなったな終了するか')
self._table_builder.dump_file()
def execute(self, krsw: bool = False, use_browser: bool = True,
enable_javascript: bool = True) -> NoReturn:
"""通信が必要な部分のロジック。
Args:
krsw bool: Trueの場合、名前が自動で :const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。
use_browser bool: TrueならSeleniumを利用する。FalseならRequestsのみでアクセスする。
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。
"""
# Seleniumドライバーを必ず終了するため、with文を利用する。
with AccessorHandler(use_browser, enable_javascript) as accessor:
# 実行前のチェック
self._check_nitter_instance(accessor)
self._check_archive_instance(accessor)
# Invidiousのインスタンスリストの正規表現パターンを取得
invidious_url_tuple: Final[tuple[str, ...]
] = self._invidious_instances(accessor)
self._invidious_pattern: re.Pattern[str] = re.compile(
'|'.join(invidious_url_tuple))
# 検索クエリの設定
self._set_queries(accessor, krsw)
# ツイートを取得し終えるまでループ
while True:
self._get_tweet(accessor)
self._go_to_new_page(accessor)
if __name__ == '__main__':
if sys.version_info.major < 3 or (
sys.version_info.major == 3 and sys.version_info.minor < 11):
logger.critical('Pythonのバージョンを3.11以上に上げて下さい')
exit(1)
parser: ArgumentParser = ArgumentParser()
parser.add_argument(
'--krsw',
action='store_true',
help='指定すると、パカデブのツイートを取得上限数まで取得する。')
parser.add_argument(
'-n',
'--no_browser',
action='store_true',
help='指定すると、Tor Browserを利用しない。')
parser.add_argument(
'-d',
'--disable_script',
action='store_true',
help='指定すると、Tor BrowserでJavaScriptを利用しない。')
args: Namespace = parser.parse_args()
twitter_archiver: TwitterArchiver = TwitterArchiver()
twitter_archiver.execute(
args.krsw,
not args.no_browser,
not args.disable_script)
実行例
20件での実行例。
12月10日
https://twitter.com/CallinShow/status/1601539154256744449(魚拓) |
---|
https://twitter.com/CallinShow/status/1601542511302160384(魚拓) |
https://twitter.com/CallinShow/status/1601569138379718656(魚拓) |
https://youtu.be/QR6Gj0MKcew(魚拓) |
https://twitter.com/CallinShow/status/1601572951463428096(魚拓) |
菊地翔 |
https://twitter.com/CallinShow/status/1601588268487041024(魚拓) |
日曜阪神11R 阪神JF |
12月11日
12月12日
https://twitter.com/CallinShow/status/1601955353792430081(魚拓) |
---|
エクシア合同会社のツイートの中に、岡ちゃんのツイートをリツイートしてしまってごめん。 |
https://twitter.com/CallinShow/status/1601958600259371011(魚拓) |
日曜日に誰もエクシア合同会社のことなんか、呟きたいなんて思わないのが普通だと思う。 |
https://twitter.com/CallinShow/status/1601959896252981249(魚拓) |
ときにはふざけたツイートをしているように思うかもしれない。 |
https://twitter.com/CallinShow/status/1601961096528613376(魚拓) |
エクシア合同会社と闘う大人たちがやっていることは、Twitterを使った世直し運動なんだ。 |