→コード: v4.0.2 JavaScriptをオフにするオプションと、Tor Browserを利用しないオプションを追加
>Fet-Fe (→コード: v4.0.1 ちゃんとsleepしてなかったので修正) |
>Fet-Fe (→コード: v4.0.2 JavaScriptをオフにするオプションと、Tor Browserを利用しないオプションを追加) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.0. | ver4.0.2 2023/9/25恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
13行目: | 13行目: | ||
Examples: | Examples: | ||
定数類は状況に応じて変えてください。 | |||
:: | |||
$ python3 (ファイル名) | |||
オプションに ``--krsw`` とつけると自動モードになります。 | |||
:: | |||
$ python3 (ファイル名) --krsw | |||
``--no_use_browser`` オプションでTor Browserを使用しないモードに、``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | |||
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | |||
つまりユーザー入力が要りません。 | |||
Note: | Note: | ||
* Pythonのバージョンは3.10以上 | |||
* 環境は玉葱前提です。 | |||
* Whonix-Workstation, MacOSで動作確認済 | |||
* MacOSの場合はbrewでtorコマンドを導入し、実行 | |||
* PySocks, bs4, seleniumはインストールしないと標準で入ってません | |||
* requestsも環境によっては入っていないかもしれない | |||
* $ pip install bs4 requests PySocks selenium | |||
* pipも入っていなければ ``$ sudo apt install pip`` | |||
* `ffmpeg <https://ffmpeg.org>`_ が入っていると動画も自動取得しますが、無くても動きます | |||
* バグ報告は `利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて <https://krsw-wiki.org/wiki/利用者・トーク:夜泣き#利用者:夜泣き/スクリプトについて>`_ | |||
""" | """ | ||
60行目: | 62行目: | ||
import subprocess | import subprocess | ||
import platform | import platform | ||
from logging import getLogger, Logger | |||
from argparse import ArgumentParser, Namespace | |||
import requests | import requests | ||
70行目: | 74行目: | ||
from selenium.webdriver.support import expected_conditions as EC | from selenium.webdriver.support import expected_conditions as EC | ||
from selenium.webdriver.common.by import By | from selenium.webdriver.common.by import By | ||
logger: Logger = getLogger(__name__) | |||
##おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | ##おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | ||
76行目: | 82行目: | ||
class AccessError(Exception): | class AccessError(Exception): | ||
"""RequestsとSeleniumで共通のアクセスエラー。 | |||
""" | |||
pass | |||
class | class ReCaptchaFoundError(Exception): | ||
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | |||
""" | |||
pass | |||
class AbstractAccessor: | |||
"""HTTPリクエストでWebサイトに接続するための基底クラス。 | |||
""" | |||
WAIT_TIME: Final[int] = 1 | |||
"""Final[int]: HTTPリクエスト成功失敗関わらず待機時間。 | |||
1秒待つだけで行儀がいいクローラーだそうなので既定では1秒。 | |||
しかし日本のポリホーモは1秒待っていても捕まえてくるので注意。 | |||
https://ja.wikipedia.org/wiki/?curid=2187212 | |||
""" | |||
WAIT_RANGE: Final[int] = 5 | |||
""" | """Final[int]: ランダムな時間待機するときの待機時間の幅。 | ||
""" | |||
REQUEST_TIMEOUT: Final[int] = 30 | |||
"""Final[int]: HTTPリクエストのタイムアウト秒数。 | |||
""" | """ | ||
def _random_sleep(self) -> None: | |||
"""ランダムな秒数スリープする。 | |||
自動操縦だとWebサイトに見破られないため。 | |||
""" | |||
sleep(random.randrange(self.WAIT_TIME, self.WAIT_TIME + self.WAIT_RANGE)) | |||
class RequestsAccessor(AbstractAccessor): | |||
"""requestsモジュールでWebサイトに接続するためのクラス。 | |||
""" | |||
HEADERS: Final[dict[str, str]] = { | |||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0' | |||
} | |||
"""Final[dict[str, str]]: HTTPリクエスト時のヘッダ。 | |||
""" | |||
PROXIES_WITH_COMMAND: Final[dict[str, str]] = { | |||
'http': 'socks5h://127.0.0.1:9050', | |||
'https': 'socks5h://127.0.0.1:9050' | |||
} | |||
"""Final[dict[str, str]]: torコマンドを起動しているときのHTTPプロキシの設定。 | |||
""" | |||
PROXIES_WITH_BROWSER: Final[dict[str, str]] = { | |||
""" | 'http': 'socks5h://127.0.0.1:9150', | ||
'https': 'socks5h://127.0.0.1:9150' | |||
} | |||
"""Final[dict[str, str]]: Tor Browserを起動しているときのHTTPプロキシの設定。 | |||
""" | """ | ||
TOR_CHECK_URL: Final[str] = 'https://check.torproject.org/api/ip' | |||
"""Final[str]: Tor経由で通信しているかチェックするサイトのURL。 | |||
""" | |||
def __init__(self): | |||
"""コンストラクタ。 | |||
""" | |||
self._proxies: dict[str, str] | None = None | |||
self._proxies = self._get_tor_proxies() ##Torに必要なプロキシをセット | |||
def _execute(self, url: Final[str], proxies: dict[str, str] | None) -> requests.models.Response: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | |||
Args: | |||
url Final[str]: 接続するURL。 | |||
proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_get_tor_proxies` で設定した値を利用する。 | |||
Returns: | |||
requests.models.Response: レスポンスのオブジェクト。 | |||
Raises: | |||
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。 | |||
AccessError: ステータスコードが200でない場合のエラー。 | |||
""" | |||
sleep(self.WAIT_TIME) ##DoS対策で待つ | |||
try: | |||
res: requests.models.Response = requests.get(url, timeout=self.REQUEST_TIMEOUT, headers=self.HEADERS, allow_redirects=False, proxies=proxies if proxies is not None else self._proxies) | |||
res.raise_for_status() ##HTTPステータスコードが200番台以外でエラー発生 | |||
except requests.exceptions.ConnectionError: | |||
raise | |||
except requests.exceptions.RequestException as e: | |||
# requestsモジュール固有の例外を共通の例外に変換 | |||
raise AccessError(str(e)) from None | |||
return res | |||
def get(self, url: Final[str], proxies: dict[str, str]=None) -> str: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | |||
Args: | |||
url Final[str]: 接続するURL。 | |||
proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_get_tor_proxies` で設定した値を利用する。 | |||
Returns: | |||
str: レスポンスのHTML。 | |||
Raises: | |||
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。 | |||
AccessError: ステータスコードが200でない場合のエラー。 | |||
""" | |||
try: | |||
return self._execute(url, proxies).text | |||
except (requests.exceptions.ConnectionError, AccessError): | |||
raise | |||
def get_image(self, url: Final[str]) -> bytes | None: | |||
"""引数のURLから画像のバイナリ列を取得する。 | |||
Args: | |||
url Final[str]: 接続するURL | |||
Returns: | |||
bytes | None: 画像のバイナリ。画像でなければNone。 | |||
Raises: | |||
requests.exceptions.ConnectionError: Torで接続ができないなど、接続のエラー。 | |||
AccessError: ステータスコードが200でない場合のエラー。 | |||
""" | |||
try: | |||
res: requests.models.Response = self._execute(url, self._proxies) | |||
except (requests.exceptions.ConnectionError, AccessError): | |||
raise | |||
if 'image' in res.headers['content-type']: | |||
return res.content | |||
else: | |||
return None | |||
def _get_tor_proxies(self) -> dict[str, str] | None | NoReturn: | |||
"""Torを使うのに必要なプロキシ情報を返す。 | |||
プロキシなしで接続できればNone、Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。 | |||
いずれでもアクセスできなければ異常終了する。 | |||
Returns: | |||
dict[str, str] | None | NoReturn: プロキシ情報。 | |||
Raises: | |||
RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。 | |||
""" | |||
print('Torのチェック中ですを') | |||
# プロキシなしでTorにアクセスできるかどうか | |||
res: str = self.get(self.TOR_CHECK_URL, proxies=None) | |||
is_tor: bool = json.loads(res)['IsTor'] | |||
if is_tor: | |||
print('Tor connection OK') | |||
return None | |||
# Tor BrowserのプロキシでTorにアクセスできるかどうか | |||
try: | |||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_BROWSER) | |||
is_tor = json.loads(res)['IsTor'] | |||
if is_tor: | |||
print('Tor connection OK') | |||
return self.PROXIES_WITH_BROWSER | |||
except requests.exceptions.ConnectionError: | |||
pass | |||
# torコマンドのプロキシでTorにアクセスできるかどうか | |||
try: | |||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_COMMAND) | |||
is_tor = json.loads(res)['IsTor'] | |||
if is_tor: | |||
print('Tor proxy OK') | |||
return self.PROXIES_WITH_COMMAND | |||
else: | |||
raise RuntimeError('サイトにTorのIPでアクセスできていないなりを') | |||
except requests.exceptions.ConnectionError as e: | |||
print(e, file=sys.stderr) | |||
print('通信がTorのSOCKS proxyを経由していないなりを', file=sys.stderr) | |||
exit(1) | |||
@property | |||
def proxies(self) -> dict[str, str] | None: | |||
"""オブジェクトのプロキシ設定を返す。 | |||
Returns: | |||
dict[str, str] | None: プロキシ設定。 | |||
""" | |||
return self._proxies | |||
class SeleniumAccessor(AbstractAccessor): | |||
"""SeleniumでWebサイトに接続するためのクラス。 | |||
""" | |||
TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | |||
""" | "Windows": "", | ||
"Darwin": "/Applications/Tor Browser.app/Contents/MacOS/firefox", | |||
"Linux": "" | |||
}) | |||
"""MappingProxyType[str, str]: OSごとのTor Browserのパス。 | |||
Todo: | |||
WindowsとLinuxでのTor Browserの実行パスを追加する。 | |||
""" | |||
WAIT_TIME_FOR_INIT: Final[int] = 15 | |||
"""Final[int]: 最初のTor接続時の待機時間。 | |||
""" | """ | ||
def __init__(self, enable_javascript: bool): | |||
"""コンストラクタ。 | |||
Tor Browserを自動操縦するためのSeleniumドライバを初期化する。 | |||
Args: | |||
enable_javascript bool: JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | |||
""" | |||
self._javascript_enabled: bool = enable_javascript | |||
options: Options = Options() | |||
options.binary_location = self.TOR_BROWSER_PATHS[platform.system()] | |||
if enable_javascript: | |||
print("reCAPTCHA対策のためJavaScriptをonにしますを") | |||
options.preferences.update({ | |||
"javascript.enabled": enable_javascript, | |||
"intl.accept_languages": "en-US, en", | |||
"intl.locale.requested": "US", | |||
"font.language.group": "x-western", | |||
"dom.webdriver.enabled": False # 自動操縦と見破られないための設定 | |||
}) | |||
try: | |||
self.driver: webdriver.Firefox = webdriver.Firefox(options=options) | |||
sleep(1) | |||
wait_init: WebDriverWait = WebDriverWait(self.driver, self.WAIT_TIME_FOR_INIT) | |||
wait_init.until(EC.element_to_be_clickable((By.ID, "connectButton"))) | |||
self.driver.find_element(By.ID, "connectButton").click() | |||
wait_init.until(EC.url_contains("about:blank")) # Torの接続が完了するまで待つ | |||
self.wait: WebDriverWait = WebDriverWait(self.driver, self.REQUEST_TIMEOUT) | |||
except Exception: | |||
self.quit() | |||
raise | |||
def quit(self) -> None: | |||
"""Seleniumドライバを終了する。 | |||
""" | |||
if self.driver: | |||
self.driver.quit() | |||
def _check_recaptcha(self) -> None: | |||
"""reCAPTCHAが表示されているかどうか判定して、入力を待機する。 | |||
Raises: | |||
ReCaptchaFoundError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。 | |||
""" | |||
try: | |||
self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]') # 要素がない時に例外を吐く | |||
if self._javascript_enabled: | |||
print("reCAPTCHAを解いてね(笑)、それはできるよね。") | |||
print("botバレしたらNew Tor circuit for this siteを選択するナリよ") | |||
WebDriverWait(self.driver, 10000).until(EC.staleness_of(self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]'))) | |||
sleep(self.WAIT_TIME) ##DoS対策で待つ | |||
else: | |||
raise ReCaptchaFoundError("JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: " + self.driver.current_url) | |||
except NoSuchElementException: | |||
# reCAPTCHAの要素がなければそのまま | |||
pass | |||
def get(self, url: Final[str]) -> str: | |||
"""引数のURLにSeleniumでHTTP接続する。 | |||
Args: | |||
url Final[str]: 接続するURL。 | |||
Returns: | |||
str: レスポンスのHTML。 | |||
""" | |||
self._random_sleep() ##DoS対策で待つ | |||
try: | |||
self.driver.get(url) | |||
self._check_recaptcha() | |||
except WebDriverException as e: | |||
# Selenium固有の例外を共通の例外に変換 | |||
raise AccessError(str(e)) from None | |||
return self.driver.page_source | |||
class AccessorHandler: | |||
"""WebサイトからHTMLを取得するためのクラス。 | |||
RequestsとSeleniumのどちらかを選択して使用することができ、その違いを隠蔽する。 | |||
""" | """ | ||
LIMIT_N_REQUESTS: Final[int] = 5 | |||
""" | """Final[int]: HTTPリクエスト失敗時の再試行回数。 | ||
""" | """ | ||
WAIT_TIME_FOR_ERROR: Final[int] = 4 | |||
""" | """Final[int]: HTTPリクエスト失敗時にさらに追加する待機時間。 | ||
""" | """ | ||
def __init__(self, use_browser: bool, enable_javascript: bool): | |||
"""コンストラクタ。 | |||
Requestのみを利用するか、Seleniumも利用するか引数で選択して初期化する。 | |||
Args: | |||
use_browser bool: TrueならSeleniumを利用する。FalseならRequestsのみでアクセスする。 | |||
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | |||
""" | |||
self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(enable_javascript) if use_browser else None | |||
self.requests_accessor: RequestsAccessor = RequestsAccessor() | |||
def __enter__(self) -> Self: | |||
"""withブロックの開始時に実行する。 | |||
""" | |||
return self | |||
def __exit__(self, *args) -> None: | |||
"""withブロックの終了時に実行する。 | |||
""" | |||
if self.selenium_accessor is not None: | |||
self.selenium_accessor.quit() | |||
def request_once(self, url: Final[str]) -> str: | |||
"""引数のURLにHTTP接続する。 | |||
Args: | |||
url Final[str]: 接続するURL。 | |||
Returns: | |||
str: レスポンスのテキスト。 | |||
Raises: | |||
AccessError: アクセスエラー。 | |||
Note: | |||
失敗かどうかは呼出側で要判定。 | |||
""" | |||
try: | |||
if self.selenium_accessor is not None: | |||
return self.selenium_accessor.get(url) | |||
else: | |||
return self.requests_accessor.get(url) | |||
except AccessError: | |||
raise | |||
def _request_with_callable(self, url: Final[str], request_callable: Callable[[str], Any]) -> Any | None: | |||
"""request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | |||
成功すると結果を返す。 | |||
接続失敗が何度も起きるとNoneを返す。 | |||
Args: | |||
url Final[str]: 接続するURL | |||
request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。 | |||
Returns: | |||
Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。 | |||
Note: | |||
失敗かどうかは呼出側で要判定 | |||
""" | |||
for i in range(1, self.LIMIT_N_REQUESTS + 1): | |||
try: | |||
res: Any = request_callable(url) | |||
except AccessError: | |||
print(url + 'への通信失敗ナリ ' + f"{i}/{self.LIMIT_N_REQUESTS}回") | |||
if i < self.LIMIT_N_REQUESTS: | |||
sleep(self.WAIT_TIME_FOR_ERROR) ##失敗時は長めに待つ | |||
else: | |||
return res | |||
return None | |||
def request(self, url: Final[str]) -> str | None: | |||
"""HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | |||
成功すると結果を返す。 | |||
接続失敗が何度も起きるとNoneを返す。 | |||
Args: | |||
url Final[str]: 接続するURL | |||
Returns: | |||
str | None: レスポンスのテキスト。接続失敗が何度も起きるとNoneを返す。 | |||
Note: | |||
失敗かどうかは呼出側で要判定 | |||
""" | |||
return self._request_with_callable(url, self.request_once) | |||
def request_with_requests_module(self, url: Final[str]) -> str | None: | |||
"""requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | |||
成功すると結果を返す。 | |||
接続失敗が何度も起きるとNoneを返す。 | |||
Args: | |||
url Final[str]: 接続するURL | |||
Returns: | |||
str | None: レスポンスのテキスト。接続失敗が何度も起きるとNoneを返す。 | |||
Note: | |||
失敗かどうかは呼出側で要判定 | |||
""" | |||
return self._request_with_callable(url, self.requests_accessor.get) | |||
def request_image(self, url: Final[str]) -> bytes | None: | |||
"""requestsモジュールで画像ファイルを取得します。 | |||
成功すると結果を返します。 | |||
接続失敗が何度も起きるとNoneを返します。 | |||
Args: | |||
url Final[str]: 接続するURL | |||
Returns: | |||
bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。 | |||
""" | |||
return self._request_with_callable(url, self.requests_accessor.get_image) | |||
@property | |||
def proxies(self) -> dict[str, str] | None: | |||
return self.requests_accessor.proxies | |||
class TwitterArchiver: | class TwitterArchiver: | ||
"""ツイートをWikiの形式にダンプするクラス。 | |||
Nitterからツイートを取得し、Wikiの形式にダンプする。 | |||
削除されたツイートや編集前のツイートは取得できない。 | |||
""" | |||
NITTER_INSTANCE: Final[str] = 'https://nitter.net/' | |||
"""Final[str]: Nitterのインスタンス。 | |||
生きているのは https://github.com/zedeus/nitter/wiki/Instances で確認。 | |||
Note: | |||
末尾にスラッシュ必須。 | |||
Todo: | |||
Tor専用のインスタンスが使えるようになったら変更する。 | |||
""" | |||
ARCHIVE_TODAY: Final[str] = 'http://archiveiya74codqgiixo33q62qlrqtkgmcitqx5u2oeqnmn5bpcbiyd.onion/' | |||
"""Final[str]: archive.todayの魚拓のonionドメイン。 | |||
ページにアクセスして魚拓があるか調べるのにはonion版のarchive.todayを使用。 | |||
Note: | |||
末尾にスラッシュ必須。 | |||
""" | |||
ARCHIVE_TODAY_STANDARD: Final[str] = 'https://archive.vn/' | |||
"""Final[str]: archive.todayの魚拓のクリアネットドメイン。 | |||
記事にはクリアネット用のarchive.todayリンクを貼る。 | |||
Note: | |||
末尾にスラッシュ必須。 | |||
""" | |||
TWITTER_URL: Final[str] = 'https://twitter.com/' | |||
"""Final[str]: TwitterのURL。 | |||
Note: | |||
末尾にスラッシュ必須。 | |||
""" | |||
CALLINSHOW: Final[str] = 'CallinShow' | |||
"""Final[str]: 降臨ショーのユーザーネーム。 | |||
""" | |||
LIMIT_N_TWEETS: Final[int] = 10000 | |||
"""Final[int]: 取得するツイート数の上限。 | |||
""" | |||
REPORT_INTERVAL: Final[int] = 5 | |||
"""Final[int]: 記録件数を報告するインターバル。 | |||
""" | |||
TWEETS_OR_REPLIES: Final[str] = 'with_replies' | |||
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | |||
Returns: | |||
dict[str, str] | None: プロキシ設定。 | |||
""" | |||
NITTER_ERROR_TITLE: Final[str] = 'Error|nitter' | |||
"""Final[str]: Nitterでユーザーがいなかったとき返ってくるページのタイトル。 | |||
万が一仕様変更で変わったとき用。 | |||
""" | |||
NO_ARCHIVE: Final[str] = 'No results' | |||
"""Final[str]: archive.todayで魚拓がなかったときのレスポンス。 | |||
万が一仕様変更で変わったとき用。 | |||
""" | |||
NEWEST: Final[str] = 'Load newest' | |||
"""Final[str]: Nitterの前ページ読み込み部分の名前。 | |||
万が一仕様変更で変わったとき用。 | |||
""" | |||
MEDIA_DIR: Final[str] = 'tweet_media' | |||
""" | """Final[str]: 画像などのツイートメディアをダウンロードするディレクトリ。 | ||
""" | """ | ||
def __init__(self): | |||
"""コンストラクタ。 | |||
""" | |||
self._check_slash() ##スラッシュが抜けてないかチェック | |||
self._txt_data: list[str] = [] | |||
self._limit_count: int = 0 ##記録数 | |||
: | def _set_queries(self, accessor: AccessorHandler, krsw: bool): | ||
"""検索条件を設定する。 | |||
:class:`~TwitterArchiver` のメンバをセットし、ツイートを収集するアカウントと | |||
検索クエリ、終わりにするツイートを入力させる。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
krsw bool: Trueの場合、名前が自動で :const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。 | |||
""" | |||
##ユーザー名取得 | |||
if krsw: | |||
print('名前は自動的に' + self.CALLINSHOW + 'にナリます') | |||
self.name: Final[str] = self.CALLINSHOW | |||
else: | |||
self.name: Final[str] = self._get_name(accessor) | |||
##検索クエリとページ取得 | |||
self.query_strs: list[str] = [] | |||
if krsw: | |||
print('クエリは自動的になしにナリます') | |||
else: | |||
self._get_query() | |||
self._page: str | None = accessor.request(urljoin(self.NITTER_INSTANCE, self.name + '/' + self.TWEETS_OR_REPLIES)) | |||
if self._page is None: | |||
self._fail() | |||
##終わりにするツイート取得 | |||
if krsw: | |||
print('終わりにするツイートは自動的になしにナリます') | |||
self._stop: Final[str] = '' | |||
else: | |||
self._stop: Final[str] = self._stop_word() | |||
##日付取得 | |||
self._date: datetime = self._tweet_date(BeautifulSoup(self._page, 'html.parser').find(class_='timeline-item')) | |||
self._txt_data.append('') | |||
print() | |||
def _check_slash(self) -> None | NoReturn: | |||
"""URLの最後にスラッシュが付いていなければエラーを出します。 | |||
Returns: | |||
None | NoReturn: すべてのURLが正しければNone。失敗したら例外を出す。 | |||
Raises: | |||
RuntimeError: URLの最後にスラッシュがついていない場合に出る。 | |||
Todo: | |||
できたらこのメソッドなしで動くようにする。 | |||
""" | |||
if self.NITTER_INSTANCE[-1] != '/': | |||
raise RuntimeError('NITTER_INSTANCEの末尾には/が必須です') | |||
if self.ARCHIVE_TODAY[-1] != '/': | |||
raise RuntimeError('ARCHIVE_TODAYの末尾には/が必須です') | |||
if self.ARCHIVE_TODAY_STANDARD[-1] != '/': | |||
raise RuntimeError('ARCHIVE_TODAY_STANDARDの末尾には/が必須です') | |||
if self.TWITTER_URL[-1] != '/': | |||
raise RuntimeError('TWITTER_URLの末尾には/が必須です') | |||
def _check_nitter_instance(self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""Nitterのインスタンスが生きているかチェックする。 | |||
死んでいたらそこで終了。 | |||
接続を一回しか試さない :func:`~_request_once` を使っているのは、激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
None | NoReturn: NitterにアクセスできればNone。できなければ終了。 | |||
""" | |||
print("Nitterのインスタンスチェック中ですを") | |||
try: | |||
accessor.request_once(self.NITTER_INSTANCE) | |||
except AccessError as e: ##エラー発生時は終了 | |||
print(e, file=sys.stderr) | |||
print('インスタンスが死んでますを', file=sys.stderr) | |||
exit(1) | |||
print("Nitter OK") | |||
def _check_archive_instance(self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""archive.todayのTor用インスタンスが生きているかチェックする。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。 | |||
""" | |||
print("archive.todayのTorインスタンスチェック中ですを") | |||
try: | |||
accessor.request_once(self.ARCHIVE_TODAY) | |||
except AccessError as e: ##エラー発生時は終了 | |||
print(e, file=sys.stderr) | |||
print('インスタンスが死んでますを', file=sys.stderr) | |||
exit(1) | |||
print("archive.today OK") | |||
def _invidious_instances(self, accessor: AccessorHandler) -> tuple[str] | NoReturn: | |||
"""Invidiousのインスタンスのタプルを取得する。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
tuple[str] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。 | |||
""" | |||
print("Invidiousのインスタンスリストを取得中ですを") | |||
invidious_json: Final[str] | None = accessor.request_with_requests_module('https://api.invidious.io/instances.json') | |||
if invidious_json is None: | |||
print("Invidiousが死んでますを") | |||
exit(1) | |||
instance_list: list[str] = [] | |||
for instance_info in json.loads(invidious_json): | |||
instance_list.append(instance_info[0]) | |||
# よく使われているものはチェック | |||
if 'piped.kavin.rocks' not in instance_list: | |||
instance_list.append('piped.kavin.rocks') | |||
if 'piped.video' not in instance_list: | |||
instance_list.append('piped.video') | |||
return tuple(instance_list) | |||
def _get_name(self, accessor: AccessorHandler) -> str | NoReturn: | |||
"""ツイート収集するユーザー名を標準入力から取得する。 | |||
何も入力しないと :const:`~CALLINSHOW` を指定する。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
str | NoReturn: ユーザ名。ユーザページの取得に失敗したら終了。 | |||
""" | |||
while True: | |||
print('アカウント名を入れなければない。空白だと自動的に' + self.CALLINSHOW + 'になりますを') | |||
account_str: str = input() ##ユーザー入力受付 | |||
##空欄で降臨ショー | |||
if account_str == '': | |||
return self.CALLINSHOW | |||
else: | |||
res: Final[str]| None = accessor.request(urljoin(self.NITTER_INSTANCE, account_str)) ##リクエストして結果取得 | |||
if res is None: ##リクエスト失敗判定 | |||
self._fail() | |||
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析 | |||
if soup.title == self.NITTER_ERROR_TITLE: ##タイトルがエラーでないか判定 | |||
print(account_str + "は実在の人物ではありませんでした") ##エラー時ループに戻る | |||
else: | |||
print("最終的に出会ったのが@" + account_str + "だった。") | |||
return account_str ##成功時アカウント名返す | |||
def _get_query(self) -> None: | |||
"""検索クエリを標準入力から取得する。 | |||
取得したクエリは ``self.query_strs`` に加えられる。 | |||
""" | |||
print("検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。") | |||
print("例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行") | |||
query_input: str = input() ##ユーザー入力受付 | |||
##空欄が押されるまでユーザー入力受付 | |||
while query_input != '': | |||
self.query_strs.append(query_input) | |||
query_input = input() | |||
print("クエリのピースが埋まっていく。") | |||
def _fail(self) -> NoReturn: | |||
"""接続失敗時処理。 | |||
取得に成功した分だけファイルにダンプし、プログラムを終了する。 | |||
""" | |||
print("接続失敗しすぎで強制終了ナリ") | |||
if len(self._txt_data) > 0: ##取得成功したデータがあれば発行 | |||
print("取得成功した分だけ発行しますを") | |||
self._make_txt() | |||
else: | |||
exit(1) ##終了 | |||
def _convert_to_text_table(self, text: str) -> str: | |||
"""``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。 | |||
Args: | |||
text str: ヘッダとフッタがないWikiテーブル。 | |||
Returns: | |||
str: テーブル表示用のヘッダとフッタがついたWikiテーブル。 | |||
""" | |||
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}' | |||
def _make_txt(self) -> NoReturn: | |||
"""Wikiテーブルをファイル出力し、プログラムを終了する。 | |||
""" | |||
self._next_day() | |||
result_txt: Final[str] = '\n'.join(self._txt_data) ##リストを合体 | |||
##ファイル出力 | |||
with codecs.open('tweet.txt', 'w', 'utf-8') as f: | |||
f.write(result_txt) | |||
print("テキストファイル手に入ったやで〜") | |||
exit(0) ##終了 | |||
def _stop_word(self) -> str: | |||
"""ツイートの記録を中断するための文をユーザに入力させる。 | |||
Returns: | |||
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | |||
""" | |||
print(f"ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。") | |||
end_str: Final[str] = input() ##ユーザー入力受付 | |||
return end_str | |||
def _download_media(self, media_name: Final[str], accessor: AccessorHandler) -> bool: | |||
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | |||
Args: | |||
media_name Final[str]: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
bool: 保存に成功したかどうか。 | |||
""" | |||
os.makedirs(self.MEDIA_DIR, exist_ok=True) | |||
url: Final[str] = urljoin('https://pbs.twimg.com/media/', media_name) | |||
image_bytes: Final[bytes | None] = accessor.request_image(url) | |||
if image_bytes is not None: | |||
with open(os.path.join(self.MEDIA_DIR, media_name), "wb") as f: | |||
f.write(image_bytes) | |||
return True | |||
else: | |||
return False | |||
def _tweet_date(self, tweet: bs4.element.Tag) -> datetime: | |||
"""ツイートの時刻を取得する。 | |||
Args: | |||
tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | |||
Returns: | |||
datetime: ツイートの時刻。 | |||
""" | |||
date_str: str = tweet.find(class_='tweet-date').a['title'] | |||
date: datetime = datetime.strptime(date_str, '%b %d, %Y · %I:%M %p %Z').replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('Asia/Tokyo')) | |||
return date | |||
#self._dateの日付のツイートがなくなったときの処理 | |||
def _next_day(self, date: datetime|None = None) -> None: | |||
"""1日分のツイートをテーブル形式に変換し、その日のツイートを記録し終わったことを通知して、``self._txt_data`` の0番目に空文字列を追加する。 | |||
Args: | |||
date datetime|None: | |||
記録した日付の前日の日付。Noneでなければ、``self._date`` をその値に更新する。 | |||
""" | |||
if self._txt_data[0]: # 空でなければ出力 | |||
self._txt_data[0] = self._convert_to_text_table(self._txt_data[0]) | |||
if os.name == 'nt': # Windows | |||
self._txt_data[0] = self._date.strftime('\n=== %#m月%#d日 ===\n') + self._txt_data[0] | |||
print(self._date.strftime('%#m月%#d日のツイートを取得完了ですを')) | |||
else: # Mac or Linux | |||
self._txt_data[0] = self._date.strftime('\n=== %-m月%-d日 ===\n') + self._txt_data[0] | |||
print(self._date.strftime('%-m月%-d日のツイートを取得完了ですを')) | |||
if date is not None: | |||
self._txt_data.insert(0, '') | |||
self._date = date | |||
def _get_tweet_media(self, tweet: bs4.element.Tag, accessor: AccessorHandler) -> str: | |||
"""ツイートの画像や動画を取得する。 | |||
Args: | |||
tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
str: Wiki記法でのファイルへのリンクの文字列。 | |||
""" | |||
tweet_media: bs4.element.Tag | None = tweet.select_one('.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択 | |||
media_txt: str = '' | |||
if tweet_media is not None: | |||
media_list: list[str] = [] | |||
# ツイートの画像の取得 | |||
for image_a in tweet_media.select('.attachment.image a'): | |||
try: | |||
media_name: str = [group for group in re.search(r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', image_a.get('href')).groups() if group][0] | |||
media_list.append(f"[[ファイル:{media_name}|240px]]") | |||
if self._download_media(media_name, accessor): | |||
print(os.path.join(self.MEDIA_DIR, media_name) + ' をアップロードしなければない。') | |||
else: | |||
print(urljoin('https://pbs.twimg.com/media/', media_name) + ' をアップロードしなければない。') | |||
except AttributeError: | |||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成 | |||
print(f"{tweet_url}の画像が取得できませんでしたを 当職無能") | |||
media_list.append(f"[[ファイル:(画像の取得ができませんでした)|240px]]") | |||
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | |||
for i, video_container in enumerate(tweet_media.select('.attachment.video-container')): | |||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成 | |||
video = video_container.select_one('video') | |||
if video is None: | |||
print(f"{tweet_url}の動画が取得できませんでしたを 当職無能") | |||
media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]") | |||
continue | |||
if subprocess.run(['which', 'ffmpeg'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0: | |||
print(f"ffmpegがないため{tweet_url}の動画が取得できませんでしたを") | |||
media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]") | |||
else: # ffmpegがある場合 | |||
media_url: str = unquote(re.search(r'[^\/]+$', video.get('data-url')).group(0)) | |||
tweet_id: str = tweet_url.split('/')[-1] | |||
# 動画のダウンロード | |||
if accessor.proxies is not None: | |||
returncode: int = subprocess.run(["ffmpeg", "-y", "-http_proxy", "accessor.proxies['http']", "-i", urljoin(self.NITTER_INSTANCE, media_url), "-c", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts"], stdout=subprocess.DEVNULL).returncode | |||
else: | |||
returncode: int = subprocess.run(["ffmpeg", "-y", "-i", urljoin(self.NITTER_INSTANCE, media_url), "-c", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts"], stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | |||
if returncode == 0: | |||
ts2mp4_returncode: int = subprocess.run(["ffmpeg", "-y", "-i", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts", "-acodec", "copy", "-vcodec", "copy", f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4"], stdout=subprocess.DEVNULL).returncode | |||
if ts2mp4_returncode == 0: | |||
print(f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。") | |||
else: | |||
print(f"{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。") | |||
media_list.append(f"[[ファイル:{tweet_id}_{i}.mp4|240px]]") | |||
else: | |||
print(f"{tweet_url}の動画が取得できませんでしたを 当職無能") | |||
media_list.append(f"[[ファイル:(動画の取得ができませんでした)|240px]]") | |||
media_txt = ' '.join(media_list) | |||
return media_txt | |||
def _get_tweet_quote(self, tweet: bs4.element.Tag, accessor: AccessorHandler) -> str: | |||
"""引用リツイートの引用元へのリンクを取得する。 | |||
Args: | |||
tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
str: Archiveテンプレートでラップされた引用元ツイートへのリンク。 | |||
""" | |||
tweet_quote: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .quote.quote-big') # 引用リツイートを選択 | |||
quote_txt: str = '' | |||
if tweet_quote is not None: | |||
link: str = tweet_quote.select_one('.quote-link').get('href') | |||
link = re.sub('#.*$', '', link) | |||
link = urljoin(self.TWITTER_URL, link) | |||
quote_txt = self._archive_url(link, accessor) | |||
tweet_quote_unavailable: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .quote.unavailable') # 引用リツイートを選択 | |||
if tweet_quote_unavailable is not None: | |||
quote_txt = '(引用元が削除されました)' | |||
return quote_txt | |||
def _get_tweet_poll(self, tweet: bs4.element.Tag) -> str: | |||
"""ツイートの投票結果を取得する。 | |||
Args: | |||
tweet bs4.element.Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | |||
Returns: | |||
str: Wiki形式に書き直した投票結果。 | |||
""" | |||
tweet_poll: Final[bs4.element.Tag | None] = tweet.select_one('.tweet-body > .poll') | |||
poll_txt: str = '' | |||
if tweet_poll is not None: | |||
poll_meters: Final[bs4.element.ResultSet] = tweet_poll.select('.poll-meter') | |||
for poll_meter in poll_meters: | |||
ratio: str = poll_meter.select_one('.poll-choice-value').text | |||
if 'leader' in poll_meter['class']: | |||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | |||
else: | |||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | |||
poll_txt += '<br>\n <span style="font-size: small;">' + tweet_poll.select_one('.poll-info').text + '</span>' | |||
return poll_txt | |||
def _get_timeline_items(self, soup: BeautifulSoup) -> list[bs4.element.Tag]: | |||
"""タイムラインのツイートを取得。 | |||
基本的に投稿時刻の降順に取得し、リプライツリーは最後のツイートの時刻を基準として降順にひとまとまりにする。 | |||
Args: | |||
soup BeautifulSoup: Nitterのページを表すBeautifulSoupオブジェクト。 | |||
Returns: | |||
list[bs4.element.Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すbs4.element.Tagオブジェクトのリスト。 | |||
""" | |||
timeline_item_list: list[bs4.element.Tag] = [] | |||
for item_or_list in soup.select('.timeline > .timeline-item, .timeline > .thread-line'): | |||
if 'unavailable' in item_or_list.attrs['class']: | |||
continue | |||
elif 'thread-line' in item_or_list.attrs['class']: | |||
# そのままtimeline-itemクラスをfind_allするとツイートの順番が逆転するので、順番通りに取得するよう処理 | |||
for item in reversed(item_or_list.select('.timeline-item')): | |||
timeline_item_list.append(item) | |||
else: | |||
timeline_item_list.append(item_or_list) | |||
return timeline_item_list | |||
def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""ページからツイート本文を ``self._txt_data`` に収めていく。 | |||
ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
""" | |||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') ##beautifulsoupでレスポンス解析 | |||
tweets: Final[list[bs4.element.Tag]] = self._get_timeline_items(soup) ##一ツイートのブロックごとにリストで取得 | |||
for tweet in tweets: ##一ツイート毎に処理 | |||
if tweet.a.text == self.NEWEST: ##Load Newestのボタンは処理しない | |||
continue | |||
if tweet.find(class_='retweet-header') is not None: ##retweet-headerはリツイートを示すので入っていれば処理しない | |||
continue | |||
if tweet.find(class_='pinned') is not None: ##pinnedは固定ツイートを示すので入っていれば処理しない | |||
continue | |||
if len(self.query_strs) > 0: # クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト | |||
not_match: bool = False | |||
for query_str in query_strs: | |||
if query_str not in tweet.text: | |||
not_match = True | |||
break | |||
if not_match: | |||
continue | |||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) ##ツイートのURL作成 | |||
date: datetime = self._tweet_date(tweet) | |||
if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day: | |||
self._next_day(date) | |||
archived_tweet_url: str = self._callinshowlink_url(tweet_url, accessor) ##ツイートURLをテンプレートCallinShowlinkに変化 | |||
tweet_content: bs4.element.Tag = tweet.find(class_='tweet-content media-body') ##ツイートの中身だけ取り出す | |||
self._archive_soup(tweet_content, accessor) ##ツイートの中身のリンクをテンプレートArchiveに変化 | |||
media_txt: str = self._get_tweet_media(tweet, accessor) ##ツイートに画像などのメディアを追加 | |||
quote_txt: str = self._get_tweet_quote(tweet, accessor) ##引用リツイートの場合、元ツイートを追加 | |||
poll_txt: str = self._get_tweet_poll(tweet) ##投票の取得 | |||
self._txt_data[0] = '!' + archived_tweet_url + '\n|-\n|\n' \ | |||
+ '<br>\n'.join(filter(None, [ | |||
self._escape_wiki_reserved_words(tweet_content.get_text()), | |||
quote_txt, | |||
media_txt, | |||
poll_txt | |||
])) \ | |||
+ '\n|-\n' \ | |||
+ self._txt_data[0] ##wikiの文法に変化 | |||
self._limit_count += 1 ##記録回数をカウント | |||
if self._limit_count % self.REPORT_INTERVAL == 0: | |||
print(f"ツイートを{self._limit_count}件も記録したンゴwwwwwwwwwww") | |||
if self._stop != '' and self._stop in tweet_content.get_text(): ##目的ツイートか判定 | |||
print("目的ツイート発見でもう尾張屋根") | |||
self._make_txt() | |||
if self._limit_count >= self.LIMIT_N_TWEETS: ##上限達成か判定 | |||
print(f"{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。") | |||
self._make_txt() | |||
def _escape_wiki_reserved_words(self, text: str) -> str: | |||
"""MediaWikiの文法と衝突する文字を無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。 | |||
""" | |||
def escape_nolink_urls(text: str) -> str: | |||
"""Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。 | |||
""" | |||
is_in_archive_template: bool = False | |||
i: int = 0 | |||
while i < len(text): | |||
if is_in_archive_template: | |||
if text[i:i+2] == '}}': | |||
is_in_archive_template = False | |||
i += 2 | |||
else: | |||
if text[i:i+10] == '{{Archive|' or text[i:i+10] == '{{archive|': | |||
is_in_archive_template = True | |||
i += 10 | |||
elif text[i:i+8] == 'https://': | |||
text = text[:i] + '<nowiki>https://</nowiki>' + text[i+8:] | |||
i += 25 | |||
elif text[i:i+7] == 'http://': | |||
text = text[:i] + '<nowiki>http://</nowiki>' + text[i+7:] | |||
i += 24 | |||
i += 1 | |||
return text | |||
text = text.replace('\n', '<br>\n') | |||
text = re.sub(r'^ ', ' ', text, flags=re.MULTILINE) | |||
text = re.sub(r'^([\*#:;])', r'<nowiki>\1</nowiki>', text, flags=re.MULTILINE) | |||
text = re.sub(r'^----', '<nowiki>----</nowiki>', text, flags=re.MULTILINE) | |||
text = escape_nolink_urls(text) | |||
return text | |||
def _archive_soup(self, tag: bs4.element.Tag, accessor: AccessorHandler) -> None: | |||
"""ツイート内のaタグをテンプレートArchiveの文字列に変化させる。 | |||
NitterリンクをYouTubeへのリンクに、bibliogramへのリンクをInstagramへのリンクに修正する。 | |||
Args: | |||
tag bs4.element.Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
""" | |||
urls_in_tweet: Final[bs4.element.ResultSet] = tag.find_all('a') | |||
for url in urls_in_tweet: | |||
if url.get('href').startswith('https://') or url.get('href').startswith('http://'): ##先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない | |||
if url.get('href').startswith('https' + self.NITTER_INSTANCE[4:]): | |||
#Nitter上のTwitterへのリンクを直す | |||
url_link: str = url.get('href').replace('https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | |||
url_link = re.sub('\?.*$', '', url_link) | |||
url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化 | |||
elif url.get('href').startswith('https://nitter.kavin.rocks/'): | |||
#Nitter上のTwitterへのリンクを直す | |||
url_link: str = url.get('href').replace('https://nitter.kavin.rocks/', self.TWITTER_URL) | |||
url_link = re.sub('\?.*$', '', url_link) | |||
url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化 | |||
elif self._invidious_pattern.search(url.get('href')): | |||
#Nitter上のYouTubeへのリンクをInvidiousのものから直す | |||
url_link: str = url.get('href') | |||
if re.match('https://[^/]+/[^/]+/', url_link) or re.search('/@[^/]*$', url_link): | |||
url_link = self._invidious_pattern.sub('youtube.com', url_link) | |||
else: | |||
url_link = self._invidious_pattern.sub('youtu.be', url_link) | |||
url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化 | |||
elif url.get('href').startswith('https://bibliogram.art/'): | |||
# Nitter上のInstagramへのリンクをBibliogramのものから直す | |||
# Bibliogramは中止されたようなのでそのうちリンクが変わるかも | |||
url_link: str = url.get('href').replace('https://bibliogram.art/', 'https://www.instagram.com/') | |||
url.replace_with(self._archive_url(url_link, accessor)) ##テンプレートArchiveに変化 | |||
else: | |||
url.replace_with(self._archive_url(url.get('href'), accessor)) ##テンプレートArchiveに変化 | |||
elif url.text.startswith('@'): | |||
url_link: str = urljoin(self.TWITTER_URL, url.get('href')) | |||
url_text: str = url.text | |||
url.replace_with(self._archive_url(url_link, accessor, url_text)) ##テンプレートArchiveに変化 | |||
def _archive_url(self, url: Final[str], accessor: AccessorHandler, text: Final[str|None] = None) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | |||
フラグメント識別子がURLに含まれていたら、Archive側のURLにも反映させる。 | |||
Args: | |||
url Final[str]: ラップするURL。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
text Final[str|None]: ArchiveテンプレートでURLの代わりに表示する文字列。 | |||
Returns: | |||
str: ArchiveタグでラップしたURL。 | |||
""" | |||
if '#' in url: # フラグメント識別子の処理 | |||
main_url, fragment = url.split('#', maxsplit=1) | |||
if text is None: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(main_url, accessor) + '#' + fragment + '}}' ##テンプレートArchiveの文字列返す | |||
else: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(main_url, accessor) + '#' + fragment + + '|3=' + text + '}}' ##テンプレートArchiveの文字列返す | |||
else: | |||
if text is None: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(url, accessor) + '}}' ##テンプレートArchiveの文字列返す | |||
else: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + self._archive(url, accessor) + '|3=' + text + '}}' ##テンプレートArchiveの文字列返す | |||
def _callinshowlink_url(self, url: Final[str], accessor: AccessorHandler) -> str: | |||
"""URLをCallinShowLinkテンプレートでラップする。 | |||
Args: | |||
url Final[str]: ラップするURL。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
str: CallinShowLinkタグでラップしたURL。 | |||
""" | |||
return '{{CallinShowLink|1=' + url + '|2=' + self._archive(url, accessor) + '}}' | |||
def _archive(self, url: Final[str], accessor: AccessorHandler) -> str: | |||
"""URLから対応するarchive.todayのURLを返す。 | |||
取得できれば魚拓ページのURLを返す。 | |||
魚拓がなければその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。 | |||
アクセスに失敗すればその旨の警告を表示し、https://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される。 | |||
Args: | |||
url Final[str]: 魚拓を取得するURL。 | |||
accessor AccessorHandler: アクセスハンドラ | |||
Returns: | |||
str: 魚拓のURL。 | |||
""" | |||
archive_url: str = urljoin(self.ARCHIVE_TODAY_STANDARD, quote(unquote(url), safe='&=+?%')) ##wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される | |||
res: Final[str | None] = accessor.request(urljoin(self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) ##アクセス用URL使って結果を取得 | |||
if res is None: ##魚拓接続失敗時処理 | |||
print(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。') | |||
else: | |||
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析 | |||
content: bs4.element.Tag = soup.find(id="CONTENT") ##archive.todayの魚拓一覧ページの中身だけ取得 | |||
if content is None or content.get_text()[:len(self.NO_ARCHIVE)] == self.NO_ARCHIVE: ##魚拓があるかないか判定 | |||
print(url + "の魚拓がない。これはいけない。") | |||
else: | |||
archive_url = content.find('a').get('href').replace(self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | |||
return archive_url | |||
def _go_to_new_page(self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""Nitterで次のページに移動する。 | |||
次のページが無ければプログラムを終了する。 | |||
Args: | |||
accessor AccessorHandler: アクセスハンドラ | |||
""" | |||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') ##beautifulsoupでレスポンス解析 | |||
show_mores: Final[bs4.element.ResultSet] = soup.find_all(class_="show-more") | |||
new_url: str = '' # ここで定義しないと動かなくなった、FIXME? | |||
for show_more in show_mores: ##show-moreに次ページへのリンクか前ページへのリンクがある | |||
if show_more.text != self.NEWEST: ##前ページへのリンクではないか判定 | |||
new_url = urljoin(self.NITTER_INSTANCE, self.CALLINSHOW + '/' + self.TWEETS_OR_REPLIES + show_more.a.get('href')) ##直下のaタグのhrefの中身取ってURL頭部分と合体 | |||
res: Final[str | None] = accessor.request(new_url) ##接続してHTML取ってくる | |||
if res is None: | |||
self._fail() | |||
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') ##beautifulsoupでレスポンス解析 | |||
if new_page_soup.find(class_="timeline-end") is None: ##ツイートの終端ではtimeline-endだけのページになるので判定 | |||
print(new_url + 'に移動しますを') | |||
self._page = res ##まだ残りツイートがあるのでページを返して再度ツイート本文収集 | |||
else: | |||
print("急に残りツイートが無くなったな終了するか") | |||
self._make_txt() | |||
def execute(self, krsw: bool=False, use_browser: bool=True, enable_javascript: bool=True) -> NoReturn: | |||
"""通信が必要な部分のロジック。 | |||
Args: | |||
krsw bool: Trueの場合、名前が自動で :const:`~CALLINSHOW` になり、クエリと終わりにするツイートが自動で無しになる。 | |||
use_browser bool: TrueならSeleniumを利用する。FalseならRequestsのみでアクセスする。 | |||
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | |||
""" | |||
# Seleniumドライバーを必ず終了するため、with文を利用する。 | |||
with AccessorHandler(use_browser, enable_javascript) as accessor: | |||
# 実行前のチェック | |||
self._check_nitter_instance(accessor) | |||
self._check_archive_instance(accessor) | |||
##Invidiousのインスタンスリストの正規表現パターンを取得 | |||
invidious_url_tuple: Final[tuple[str]] = self._invidious_instances(accessor) | |||
self._invidious_pattern: Final[re.Pattern] = re.compile('|'.join(invidious_url_tuple)) | |||
# 検索クエリの設定 | |||
self._set_queries(accessor, krsw) | |||
# ツイートを取得し終えるまでループ | |||
while True: | |||
self._get_tweet(accessor) | |||
self._go_to_new_page(accessor) | |||
if __name__ == '__main__': | if __name__ == '__main__': | ||
if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 10): | |||
print('Pythonのバージョンを3.10以上に上げて下さい', file=sys.stderr) | |||
exit(1) | |||
parser: ArgumentParser = ArgumentParser() | |||
parser.add_argument("--krsw", action='store_true', help="指定すると、パカデブのツイートを取得上限数まで取得する。") | |||
parser.add_argument("-n", "--no_use_browser", action='store_true', help="指定すると、Tor Browserを利用しない。") | |||
parser.add_argument("-d", "--disable_script", action='store_true', help="指定すると、Tor BrowserでJavaScriptを利用しない。") | |||
args: Namespace = parser.parse_args() | |||
twitter_archiver: TwitterArchiver = TwitterArchiver() | |||
twitter_archiver.execute(args.krsw, not args.no_use_browser, not args.disable_script) | |||
</syntaxhighlight> | </syntaxhighlight> | ||