→コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す
>Fet-Fe (→コード: v4.0.2 JavaScriptをオフにするオプションと、Tor Browserを利用しないオプションを追加) |
>Fet-Fe (→コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.0. | ver4.0.3 2023/9/28恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
23行目: | 23行目: | ||
$ python3 (ファイル名) --krsw | $ python3 (ファイル名) --krsw | ||
``-- | ``--no_browser`` オプションでTor Browserを使用しないモードに、 | ||
``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | |||
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | 自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | ||
29行目: | 30行目: | ||
Note: | Note: | ||
* Pythonのバージョンは3. | * Pythonのバージョンは3.11以上 | ||
* 環境は玉葱前提です。 | * 環境は玉葱前提です。 | ||
* Whonix-Workstation, MacOSで動作確認済 | * Whonix-Workstation, MacOSで動作確認済 | ||
45行目: | 46行目: | ||
""" | """ | ||
import codecs | import codecs | ||
import json | import json | ||
import logging | |||
import os | |||
import platform | |||
import random | import random | ||
import re | |||
import subprocess | |||
import sys | |||
import warnings | |||
from argparse import ArgumentParser, Namespace | |||
from collections.abc import Callable | |||
from datetime import datetime | from datetime import datetime | ||
from | from logging import Logger, getLogger | ||
from time import sleep | from time import sleep | ||
from types import MappingProxyType | from types import MappingProxyType, TracebackType | ||
from typing import Final, NoReturn | from typing import Any, Final, NoReturn, Self | ||
from urllib.parse import quote, unquote, urljoin | from urllib.parse import quote, unquote, urljoin | ||
from zoneinfo import ZoneInfo | |||
from | |||
import requests | import requests | ||
from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||
from bs4.element import NavigableString, ResultSet, Tag | |||
from selenium import webdriver | from selenium import webdriver | ||
from selenium.common.exceptions import NoSuchElementException, WebDriverException | from selenium.common.exceptions import (NoSuchElementException, | ||
WebDriverException) | |||
from selenium.webdriver.common.by import By | |||
from selenium.webdriver.firefox.options import Options | from selenium.webdriver.firefox.options import Options | ||
from selenium.webdriver.support import expected_conditions as ec | |||
from selenium.webdriver.support import expected_conditions as | from selenium.webdriver.support.wait import WebDriverWait | ||
from selenium.webdriver. | |||
logging.basicConfig(level=logging.INFO, | |||
format='{asctime} [{levelname:.4}] : {message}', style='{') | |||
logger: Logger = getLogger(__name__) | logger: Logger = getLogger(__name__) | ||
# おそらくはツイートの内容によってMarkupResemblesLocatorWarningが吐き出されることがあるので無効化 | |||
warnings.simplefilter('ignore') | warnings.simplefilter('ignore') | ||
87行目: | 91行目: | ||
class | class ReCaptchaRequiredError(Exception): | ||
"""JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | """JavaScriptがオフの時にreCAPTCHAを要求された場合のエラー。 | ||
""" | """ | ||
118行目: | 122行目: | ||
自動操縦だとWebサイトに見破られないため。 | 自動操縦だとWebサイトに見破られないため。 | ||
""" | """ | ||
sleep(random.randrange(self.WAIT_TIME, self.WAIT_TIME + self.WAIT_RANGE)) | sleep(random.randrange(self.WAIT_TIME, | ||
self.WAIT_TIME + self.WAIT_RANGE)) | |||
153行目: | 158行目: | ||
""" | """ | ||
self._proxies: dict[str, str] | None = None | self._proxies: dict[str, str] | None = None | ||
self._proxies = self. | self._proxies = self._choose_tor_proxies() # Torに必要なプロキシをセット | ||
def _execute(self, url: | def _execute(self, | ||
url: str, | |||
proxies: dict[str, | |||
str] | None) -> requests.models.Response: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | """引数のURLにrequestsモジュールでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
proxies dir[str, str]: | proxies dir[str, str]: 接続に利用するプロキシ。 | ||
デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。 | |||
Returns: | Returns: | ||
169行目: | 178行目: | ||
AccessError: ステータスコードが200でない場合のエラー。 | AccessError: ステータスコードが200でない場合のエラー。 | ||
""" | """ | ||
sleep(self.WAIT_TIME) | sleep(self.WAIT_TIME) # DoS対策で待つ | ||
try: | try: | ||
res: requests.models.Response = requests.get(url, timeout=self.REQUEST_TIMEOUT, headers=self.HEADERS, allow_redirects=False, proxies=proxies if proxies is not None else self._proxies) | res: requests.models.Response = requests.get( | ||
res.raise_for_status() | url, | ||
timeout=self.REQUEST_TIMEOUT, | |||
headers=self.HEADERS, | |||
allow_redirects=False, | |||
proxies=proxies if proxies is not None else self._proxies) | |||
res.raise_for_status() # HTTPステータスコードが200番台以外でエラー発生 | |||
except requests.exceptions.ConnectionError: | except requests.exceptions.ConnectionError: | ||
raise | raise | ||
except requests.exceptions.RequestException as e: | except requests.exceptions.RequestException as e: | ||
# requestsモジュール固有の例外を共通の例外に変換 | # requestsモジュール固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from | raise AccessError(str(e)) from e | ||
return res | return res | ||
def get(self, url: | def get(self, | ||
url: str, | |||
proxies: dict[str, | |||
str] | None = None) -> str: | |||
"""引数のURLにrequestsモジュールでHTTP接続する。 | """引数のURLにrequestsモジュールでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~ | proxies dir[str, str]: 接続に利用するプロキシ。デフォルトでは :func:`~_choose_tor_proxies` で設定した値を利用する。 | ||
Returns: | Returns: | ||
199行目: | 216行目: | ||
raise | raise | ||
def get_image(self, url: | def get_image(self, url: str) -> bytes | None: | ||
"""引数のURLから画像のバイナリ列を取得する。 | """引数のURLから画像のバイナリ列を取得する。 | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
222行目: | 239行目: | ||
return None | return None | ||
def | def _choose_tor_proxies(self) -> dict[str, str] | None | NoReturn: | ||
"""Torを使うのに必要なプロキシ情報を返す。 | """Torを使うのに必要なプロキシ情報を返す。 | ||
プロキシなしで接続できればNone、 | |||
Tor Browserのプロキシで接続できるなら :const:`~PROXIES_WITH_BROWSER`、 | |||
torコマンドのプロキシで接続できるなら :const:`~PROXIES_WITH_COMMAND`。 | |||
いずれでもアクセスできなければ異常終了する。 | いずれでもアクセスできなければ異常終了する。 | ||
234行目: | 253行目: | ||
RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。 | RuntimeError: :const:`~TOR_CHECK_URL` にアクセスできない場合のエラー。 | ||
""" | """ | ||
logger.info('Torのチェック中ですを') | |||
# プロキシなしでTorにアクセスできるかどうか | # プロキシなしでTorにアクセスできるかどうか | ||
res: str = self.get(self.TOR_CHECK_URL, proxies=None) | res: str = self.get(self.TOR_CHECK_URL, proxies=None) | ||
is_tor: bool = json.loads(res)['IsTor'] | is_tor: bool = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor connection OK') | |||
return None | return None | ||
# Tor BrowserのプロキシでTorにアクセスできるかどうか | # Tor BrowserのプロキシでTorにアクセスできるかどうか | ||
try: | try: | ||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_BROWSER) | res = self.get(self.TOR_CHECK_URL, | ||
proxies=self.PROXIES_WITH_BROWSER) | |||
is_tor = json.loads(res)['IsTor'] | is_tor = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor connection OK') | |||
return self.PROXIES_WITH_BROWSER | return self.PROXIES_WITH_BROWSER | ||
except requests.exceptions.ConnectionError: | except requests.exceptions.ConnectionError: | ||
254行目: | 274行目: | ||
# torコマンドのプロキシでTorにアクセスできるかどうか | # torコマンドのプロキシでTorにアクセスできるかどうか | ||
try: | try: | ||
res = self.get(self.TOR_CHECK_URL, proxies=self.PROXIES_WITH_COMMAND) | res = self.get(self.TOR_CHECK_URL, | ||
proxies=self.PROXIES_WITH_COMMAND) | |||
is_tor = json.loads(res)['IsTor'] | is_tor = json.loads(res)['IsTor'] | ||
if is_tor: | if is_tor: | ||
logger.info('Tor proxy OK') | |||
return self.PROXIES_WITH_COMMAND | return self.PROXIES_WITH_COMMAND | ||
else: | else: | ||
raise RuntimeError('サイトにTorのIPでアクセスできていないなりを') | raise RuntimeError('サイトにTorのIPでアクセスできていないなりを') | ||
except requests.exceptions.ConnectionError as e: | except requests.exceptions.ConnectionError as e: | ||
logger.critical(e) | |||
logger.critical('通信がTorのSOCKS proxyを経由していないなりを') | |||
exit(1) | exit(1) | ||
281行目: | 302行目: | ||
TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | TOR_BROWSER_PATHS: MappingProxyType[str, str] = MappingProxyType({ | ||
'Windows': '', | |||
'Darwin': '/Applications/Tor Browser.app/Contents/MacOS/firefox', | |||
'Linux': '/usr/bin/torbrowser' | |||
}) | }) | ||
"""MappingProxyType[str, str]: OSごとのTor Browserのパス。 | """MappingProxyType[str, str]: OSごとのTor Browserのパス。 | ||
309行目: | 330行目: | ||
if enable_javascript: | if enable_javascript: | ||
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | |||
options.preferences.update({ | options.preferences.update({ | ||
'javascript.enabled': enable_javascript, | |||
'intl.accept_languages': 'en-US, en', | |||
'intl.locale.requested': 'US', | |||
'font.language.group': 'x-western', | |||
'dom.webdriver.enabled': False # 自動操縦と見破られないための設定 | |||
}) | }) | ||
322行目: | 343行目: | ||
self.driver: webdriver.Firefox = webdriver.Firefox(options=options) | self.driver: webdriver.Firefox = webdriver.Firefox(options=options) | ||
sleep(1) | sleep(1) | ||
wait_init: WebDriverWait = WebDriverWait(self.driver, self.WAIT_TIME_FOR_INIT) | wait_init: WebDriverWait = WebDriverWait(self.driver, | ||
wait_init.until( | self.WAIT_TIME_FOR_INIT) | ||
self.driver.find_element(By.ID, | wait_init.until( | ||
wait_init.until( | ec.element_to_be_clickable((By.ID, 'connectButton')) | ||
) | |||
self.driver.find_element(By.ID, 'connectButton').click() | |||
wait_init.until(ec.url_contains('about:blank')) # Torの接続が完了するまで待つ | |||
self.wait: WebDriverWait = WebDriverWait(self.driver, self.REQUEST_TIMEOUT) | self.wait: WebDriverWait = WebDriverWait(self.driver, | ||
except | self.REQUEST_TIMEOUT) | ||
except BaseException: | |||
self.quit() | self.quit() | ||
raise | raise | ||
342行目: | 367行目: | ||
Raises: | Raises: | ||
ReCaptchaRequiredError: JavaScriptがオフの状態でreCAPTCHAが要求された場合のエラー。 | |||
Todo: | |||
botバレしたときに自動で他のTorサーキットに接続し直す。 | |||
""" | """ | ||
try: | try: | ||
self.driver.find_element(By.CSS_SELECTOR, 'script[src^="https://www.google.com/recaptcha/api.js"]') | self.driver.find_element( # 要素がない時に例外を吐く | ||
By.CSS_SELECTOR, | |||
'script[src^="https://www.google.com/recaptcha/api.js"]') | |||
if self._javascript_enabled: | if self._javascript_enabled: | ||
logger.warning('reCAPTCHAを解いてね(笑)、それはできるよね。') | |||
logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ') | |||
WebDriverWait(self.driver, 10000).until( | WebDriverWait( | ||
sleep(self.WAIT_TIME) | self.driver, | ||
10000).until( | |||
ec.staleness_of( | |||
self.driver.find_element( | |||
By.CSS_SELECTOR, | |||
'script[src^="https://www.google.com/recaptcha/api.js"]'))) | |||
sleep(self.WAIT_TIME) # DoS対策で待つ | |||
else: | else: | ||
raise | raise ReCaptchaRequiredError( | ||
'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: ' | |||
+ self.driver.current_url) | |||
except NoSuchElementException: | except NoSuchElementException: | ||
# reCAPTCHAの要素がなければそのまま | # reCAPTCHAの要素がなければそのまま | ||
pass | pass | ||
def get(self, url: | def get(self, url: str) -> str: | ||
"""引数のURLにSeleniumでHTTP接続する。 | """引数のURLにSeleniumでHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
Returns: | Returns: | ||
str: レスポンスのHTML。 | str: レスポンスのHTML。 | ||
""" | """ | ||
self._random_sleep() | self._random_sleep() # DoS対策で待つ | ||
try: | try: | ||
self.driver.get(url) | self.driver.get(url) | ||
372行目: | 410行目: | ||
except WebDriverException as e: | except WebDriverException as e: | ||
# Selenium固有の例外を共通の例外に変換 | # Selenium固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from | raise AccessError(str(e)) from e | ||
return self.driver.page_source | return self.driver.page_source | ||
399行目: | 437行目: | ||
enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | enable_javascript bool: SeleniumでJavaScriptを利用する場合はTrue。 | ||
""" | """ | ||
self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor(enable_javascript) if use_browser else None | self.selenium_accessor: SeleniumAccessor | None = SeleniumAccessor( | ||
enable_javascript) if use_browser else None | |||
self.requests_accessor: RequestsAccessor = RequestsAccessor() | self.requests_accessor: RequestsAccessor = RequestsAccessor() | ||
def __enter__(self) -> Self: | def __enter__(self) -> Self: | ||
"""withブロックの開始時に実行する。 | """withブロックの開始時に実行する。 | ||
Returns: | |||
Self: オブジェクト自身。 | |||
""" | """ | ||
return self | return self | ||
def __exit__(self, | def __exit__(self, | ||
exc_type: type[BaseException] | None, | |||
exc_value: BaseException | None, | |||
traceback: TracebackType | None) -> None: | |||
"""withブロックの終了時に実行する。 | """withブロックの終了時に実行する。 | ||
Args: | |||
exc_type (type[BaseException] | None): コンテキスト内で例外を吐いた場合の例外タイプ。 | |||
exc_value (BaseException | None): コンテキスト内で例外を吐いた場合の例外。 | |||
traceback (TracebackType | None): コンテキスト内で例外を吐いた場合のトレースバック。 | |||
""" | """ | ||
if self.selenium_accessor is not None: | if self.selenium_accessor is not None: | ||
self.selenium_accessor.quit() | self.selenium_accessor.quit() | ||
def request_once(self, url: | def request_once(self, url: str) -> str: | ||
"""引数のURLにHTTP接続する。 | """引数のURLにHTTP接続する。 | ||
Args: | Args: | ||
url | url str: 接続するURL。 | ||
Returns: | Returns: | ||
436行目: | 486行目: | ||
raise | raise | ||
def _request_with_callable(self, url: | def _request_with_callable(self, | ||
url: str, | |||
request_callable: Callable[[str], | |||
Any]) -> Any | None: | |||
"""request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """request_callableの実行を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
443行目: | 496行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。 | request_callable Callable[[str], Any]: 1回リクエストを行うメソッド。 | ||
Returns: | Returns: | ||
Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。 | Any | None: レスポンス。接続失敗が何度も起きるとNoneを返す。 | ||
""" | """ | ||
for i in range(1, self.LIMIT_N_REQUESTS + 1): | for i in range(1, self.LIMIT_N_REQUESTS + 1): | ||
456行目: | 506行目: | ||
res: Any = request_callable(url) | res: Any = request_callable(url) | ||
except AccessError: | except AccessError: | ||
logger.warning( | |||
url | |||
+ 'への通信失敗ナリ ' | |||
+ f'{i}/{self.LIMIT_N_REQUESTS}回') | |||
if i < self.LIMIT_N_REQUESTS: | if i < self.LIMIT_N_REQUESTS: | ||
sleep(self.WAIT_TIME_FOR_ERROR) | sleep(self.WAIT_TIME_FOR_ERROR) # 失敗時は長めに待つ | ||
else: | else: | ||
return res | return res | ||
return None | return None | ||
def request(self, url: | def request(self, url: str) -> str | None: | ||
"""HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """HTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
470行目: | 523行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
480行目: | 533行目: | ||
return self._request_with_callable(url, self.request_once) | return self._request_with_callable(url, self.request_once) | ||
def request_with_requests_module(self, url: | def request_with_requests_module(self, url: str) -> str | None: | ||
"""requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | """requestsモジュールでのHTTP接続を :const:`~LIMIT_N_REQUESTS` で定義された回数まで試す。 | ||
487行目: | 540行目: | ||
Args: | Args: | ||
url | url str: 接続するURL | ||
Returns: | Returns: | ||
497行目: | 550行目: | ||
return self._request_with_callable(url, self.requests_accessor.get) | return self._request_with_callable(url, self.requests_accessor.get) | ||
def request_image(self, url: | def request_image(self, url: str) -> bytes | None: | ||
"""requestsモジュールで画像ファイルを取得します。 | """requestsモジュールで画像ファイルを取得します。 | ||
509行目: | 562行目: | ||
bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。 | bytes | None: レスポンスのバイト列。接続失敗が何度も起きるとNoneを返します。 | ||
""" | """ | ||
return self._request_with_callable(url, self.requests_accessor.get_image) | return self._request_with_callable(url, | ||
self.requests_accessor.get_image) | |||
@property | @property | ||
def proxies(self) -> dict[str, str] | None: | def proxies(self) -> dict[str, str] | None: | ||
"""RequestsAccessorオブジェクトのプロキシ設定を返す。 | |||
Returns: | |||
dict[str, str] | None: RequestsAccessorオブジェクトのプロキシ設定。 | |||
""" | |||
return self.requests_accessor.proxies | return self.requests_accessor.proxies | ||
class TableBuilder: | |||
def __init__(self, date: datetime): | |||
self._tables: list[str] = [''] | |||
self._count: int = 0 # 記録数 | |||
self._date: datetime = date | |||
@property | |||
def count(self) -> int: | |||
return self._count | |||
def append(self, archived_tweet_url: str, text: str) -> None: | |||
self._tables[0] = '!' + archived_tweet_url + '\n|-\n|\n' \ | |||
+ text \ | |||
+ '\n|-\n' \ | |||
+ self._tables[0] # wikiの文法に変化 | |||
self._count += 1 | |||
def dump_file(self) -> NoReturn: | |||
"""Wikiテーブルをファイル出力し、プログラムを終了する。 | |||
""" | |||
self._next_day() | |||
result_txt: Final[str] = '\n'.join(self._tables) | |||
with codecs.open('tweet.txt', 'w', 'utf-8') as f: | |||
f.write(result_txt) | |||
logger.info('テキストファイル手に入ったやで〜') | |||
exit(0) | |||
def next_day_if_necessary(self, date: datetime | None = None) -> None: | |||
if date is None: | |||
return | |||
if date.year != self._date.year or date.month != self._date.month or date.day != self._date.day: | |||
self._next_day(date) | |||
def _next_day(self, date: datetime | None = None) -> None: | |||
if self._tables[0]: | |||
self._tables[0] = self._convert_to_text_table(self._tables[0]) | |||
if os.name == 'nt': # Windows | |||
self._txt_data[0] = self._date.strftime( | |||
'\n=== %#m月%#d日 ===\n') + self._txt_data[0] | |||
logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを')) | |||
else: # Mac or Linux | |||
self._tables[0] = self._date.strftime( | |||
'\n=== %-m月%-d日 ===\n') + self._tables[0] | |||
logger.info(self._date.strftime('%-m月%-d日のツイートを取得完了ですを')) | |||
if date is not None: | |||
self._tables.insert(0, '') | |||
self._date = date | |||
def _convert_to_text_table(self, text: str) -> str: | |||
"""``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。 | |||
Args: | |||
text str: ヘッダとフッタがないWikiテーブル。 | |||
Returns: | |||
str: テーブル表示用のヘッダとフッタがついたWikiテーブル。 | |||
""" | |||
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}' | |||
@staticmethod | |||
def escape_wiki_reserved_words(text: str) -> str: | |||
"""MediaWikiの文法と衝突する文字を無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: MediaWikiの文法と衝突する文字がエスケープされたツイートの文字列。 | |||
""" | |||
def escape_nolink_urls(text: str) -> str: | |||
"""Archiveテンプレートの中にないURLがWikiでaタグに変換されないよう無効化する。 | |||
Args: | |||
text str: ツイートの文字列。 | |||
Returns: | |||
str: Archiveテンプレートの中にないURLがnowikiタグでエスケープされた文字列。 | |||
""" | |||
is_in_archive_template: bool = False | |||
i: int = 0 | |||
while i < len(text): | |||
if is_in_archive_template: | |||
if text[i:i + 2] == '}}': | |||
is_in_archive_template = False | |||
i += 2 | |||
else: | |||
if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|': | |||
is_in_archive_template = True | |||
i += 10 | |||
elif text[i:i + 8] == 'https://': | |||
text = text[:i] + \ | |||
'<nowiki>https://</nowiki>' + text[i + 8:] | |||
i += 25 | |||
elif text[i:i + 7] == 'http://': | |||
text = text[:i] + \ | |||
'<nowiki>http://</nowiki>' + text[i + 7:] | |||
i += 24 | |||
i += 1 | |||
return text | |||
text = text.replace('\n', '<br>\n') | |||
text = re.sub(r'^ ', ' ', text, flags=re.MULTILINE) | |||
text = re.sub( | |||
r'^([\*#:;])', | |||
r'<nowiki>\1</nowiki>', | |||
text, | |||
flags=re.MULTILINE) | |||
text = re.sub( | |||
r'^----', | |||
'<nowiki>----</nowiki>', | |||
text, | |||
flags=re.MULTILINE) | |||
text = escape_nolink_urls(text) | |||
return text | |||
@staticmethod | |||
def archive_url( | |||
url: str, | |||
archived_url: str, | |||
text: str | None = None) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | |||
Args: | |||
url str: ラップするURL。 | |||
archive_url str: ラップするURLの魚拓のURL。 | |||
text str | None: ArchiveテンプレートでURLの代わりに表示する文字列。 | |||
Returns: | |||
str: ArchiveタグでラップしたURL。 | |||
""" | |||
if text is None: | |||
return '{{Archive|1=' + unquote(url) + '|2=' + archived_url + '}}' | |||
else: | |||
return '{{Archive|1=' + \ | |||
unquote(url) + '|2=' + archived_url + '|3=' + text + '}}' | |||
@staticmethod | |||
def callinshowlink_url(url: str, archived_url: str) -> str: | |||
"""URLをCallinShowLinkテンプレートでラップする。 | |||
Args: | |||
url str: ラップするURL。 | |||
archive_url str: ラップするURLの魚拓のURL。 | |||
Returns: | |||
str: CallinShowLinkタグでラップしたURL。 | |||
""" | |||
return '{{CallinShowLink|1=' + url + '|2=' + archived_url + '}}' | |||
563行目: | 773行目: | ||
""" | """ | ||
LIMIT_N_TWEETS: Final[int] = | LIMIT_N_TWEETS: Final[int] = 100 | ||
"""Final[int]: 取得するツイート数の上限。 | """Final[int]: 取得するツイート数の上限。 | ||
""" | """ | ||
573行目: | 783行目: | ||
TWEETS_OR_REPLIES: Final[str] = 'with_replies' | TWEETS_OR_REPLIES: Final[str] = 'with_replies' | ||
"""Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | """Final[str]: NitterのURLのドメインとユーザーネーム部分の接続部品。 | ||
""" | """ | ||
603行目: | 810行目: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
self._check_slash() | self._check_slash() # スラッシュが抜けてないかチェック | ||
def _set_queries(self, accessor: AccessorHandler, krsw: bool): | def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> None: | ||
"""検索条件を設定する。 | """検索条件を設定する。 | ||
618行目: | 823行目: | ||
""" | """ | ||
# ユーザー名取得 | |||
if krsw: | if krsw: | ||
logger.info('名前は自動的に' + self.CALLINSHOW + 'にナリます') | |||
self._name: str = self.CALLINSHOW if krsw else self._get_name(accessor) | |||
# 検索クエリとページ取得 | |||
self.query_strs: list[str] = [] | self.query_strs: list[str] = [] | ||
if krsw: | if krsw: | ||
logger.info('クエリは自動的になしにナリます') | |||
else: | else: | ||
self._get_query() | self._get_query() | ||
page_optional: str | None = accessor.request( | |||
if | urljoin(self.NITTER_INSTANCE, self._name + '/' | ||
+ self.TWEETS_OR_REPLIES)) | |||
if page_optional is None: | |||
self._fail() | self._fail() | ||
self._page: str = page_optional | |||
# 終わりにするツイート取得 | |||
if krsw: | if krsw: | ||
logger.info('終わりにするツイートは自動的になしにナリます') | |||
self._stop: str = '' if krsw else self._stop_word() | |||
self. | # 日付取得 | ||
timeline_item: Tag | NavigableString | None = BeautifulSoup( | |||
self._page, 'html.parser').find( | |||
class_='timeline-item') | |||
assert isinstance(timeline_item, Tag) | |||
date: datetime = self._tweet_date(timeline_item) | |||
self._table_builder: TableBuilder = TableBuilder(date) | |||
self. | |||
def _check_slash(self) -> None | NoReturn: | def _check_slash(self) -> None | NoReturn: | ||
655行目: | 863行目: | ||
Raises: | Raises: | ||
RuntimeError: URLの最後にスラッシュがついていない場合に出る。 | RuntimeError: URLの最後にスラッシュがついていない場合に出る。 | ||
""" | """ | ||
if self.NITTER_INSTANCE[-1] != '/': | if self.NITTER_INSTANCE[-1] != '/': | ||
668行目: | 873行目: | ||
raise RuntimeError('TWITTER_URLの末尾には/が必須です') | raise RuntimeError('TWITTER_URLの末尾には/が必須です') | ||
def _check_nitter_instance(self, accessor: AccessorHandler) -> None | NoReturn: | def _check_nitter_instance( | ||
self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""Nitterのインスタンスが生きているかチェックする。 | """Nitterのインスタンスが生きているかチェックする。 | ||
死んでいたらそこで終了。 | 死んでいたらそこで終了。 | ||
接続を一回しか試さない :func:`~_request_once` | 接続を一回しか試さない :func:`~_request_once` を使っているのは、 | ||
激重インスタンスが指定されたとき試行回数増やして偶然成功してそのまま実行されるのを躱すため。 | |||
Args: | Args: | ||
680行目: | 887行目: | ||
None | NoReturn: NitterにアクセスできればNone。できなければ終了。 | None | NoReturn: NitterにアクセスできればNone。できなければ終了。 | ||
""" | """ | ||
logger.info('Nitterのインスタンスチェック中ですを') | |||
try: | try: | ||
accessor.request_once(self.NITTER_INSTANCE) | accessor.request_once(self.NITTER_INSTANCE) | ||
except AccessError as e: | except AccessError as e: # エラー発生時は終了 | ||
logger.critical(e) | |||
logger.critical('インスタンスが死んでますを') | |||
exit(1) | exit(1) | ||
logger.info('Nitter OK') | |||
def _check_archive_instance(self, accessor: AccessorHandler) -> None | NoReturn: | def _check_archive_instance( | ||
self, accessor: AccessorHandler) -> None | NoReturn: | |||
"""archive.todayのTor用インスタンスが生きているかチェックする。 | """archive.todayのTor用インスタンスが生きているかチェックする。 | ||
698行目: | 906行目: | ||
None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。 | None | NoReturn: archive.todayのTorインスタンスにアクセスできればNone。できなければ終了。 | ||
""" | """ | ||
logger.info('archive.todayのTorインスタンスチェック中ですを') | |||
try: | try: | ||
accessor.request_once(self.ARCHIVE_TODAY) | accessor.request_once(self.ARCHIVE_TODAY) | ||
except AccessError as e: | except AccessError as e: # エラー発生時は終了 | ||
logger.critical(e) | |||
logger.critical('インスタンスが死んでますを') | |||
exit(1) | exit(1) | ||
logger.info('archive.today OK') | |||
def _invidious_instances(self, accessor: AccessorHandler) -> tuple[str] | NoReturn: | def _invidious_instances( | ||
self, | |||
accessor: AccessorHandler) -> tuple[str, ...] | NoReturn: | |||
"""Invidiousのインスタンスのタプルを取得する。 | """Invidiousのインスタンスのタプルを取得する。 | ||
714行目: | 924行目: | ||
Returns: | Returns: | ||
tuple[str] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。 | tuple[str, ...] | NoReturn: Invidiousのインスタンスのタプル。Invidiousのインスタンスが死んでいれば終了。 | ||
""" | """ | ||
logger.info('Invidiousのインスタンスリストを取得中ですを') | |||
invidious_json: Final[str] | None = accessor.request_with_requests_module('https://api.invidious.io/instances.json') | invidious_json: Final[str] | None = accessor.request_with_requests_module( | ||
'https://api.invidious.io/instances.json') | |||
if invidious_json is None: | if invidious_json is None: | ||
logger.critical('Invidiousが死んでますを') | |||
exit(1) | exit(1) | ||
instance_list: list[str] = [] | instance_list: list[str] = [] | ||
744行目: | 955行目: | ||
""" | """ | ||
while True: | while True: | ||
logger.info( | |||
account_str: str = input() | 'アカウント名を入れなければない。空白だと自動的に' | ||
+ self.CALLINSHOW | |||
+ 'になりますを') | |||
account_str: str = input() | |||
# 空欄で降臨ショー | |||
if account_str == '': | if account_str == '': | ||
return self.CALLINSHOW | return self.CALLINSHOW | ||
else: | else: | ||
res: Final[str | res: Final[str | None] = accessor.request( | ||
if res is None: | urljoin(self.NITTER_INSTANCE, account_str)) | ||
if res is None: # リクエスト失敗判定 | |||
self._fail() | self._fail() | ||
soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | soup: BeautifulSoup = BeautifulSoup(res, 'html.parser') | ||
if soup.title == self.NITTER_ERROR_TITLE: | if soup.title == self.NITTER_ERROR_TITLE: | ||
logger.warning(account_str + 'は実在の人物ではありませんでした') | |||
else: | else: | ||
logger.info('最終的に出会ったのが@' + account_str + 'だった。') | |||
return account_str | return account_str | ||
def _get_query(self) -> None: | def _get_query(self) -> None: | ||
765行目: | 980行目: | ||
取得したクエリは ``self.query_strs`` に加えられる。 | 取得したクエリは ``self.query_strs`` に加えられる。 | ||
""" | """ | ||
logger.info('検索クエリを入れるナリ。複数ある時は一つの塊ごとに入力するナリ。空欄を入れると検索開始ナリ。') | |||
logger.info('例:「無能」→改行→「脱糞」→改行→「弟殺し」→改行→空欄で改行') | |||
query_input: str = input() | query_input: str = input() | ||
# 空欄が押されるまでユーザー入力受付 | |||
while query_input != '': | while query_input != '': | ||
self.query_strs.append(query_input) | self.query_strs.append(query_input) | ||
query_input = input() | query_input = input() | ||
logger.info('クエリのピースが埋まっていく。') | |||
def _fail(self) -> NoReturn: | def _fail(self) -> NoReturn: | ||
779行目: | 994行目: | ||
取得に成功した分だけファイルにダンプし、プログラムを終了する。 | 取得に成功した分だけファイルにダンプし、プログラムを終了する。 | ||
""" | """ | ||
logger.critical('接続失敗しすぎで強制終了ナリ') | |||
if | if self._table_builder.count > 0: # 取得成功したデータがあれば発行 | ||
logger.critical('取得成功した分だけ発行しますを') | |||
self._table_builder.dump_file() | |||
exit(1) | |||
exit( | |||
def _stop_word(self) -> str: | def _stop_word(self) -> str: | ||
814行目: | 1,006行目: | ||
str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | str: ツイートの記録を中断するための文。ツイート内容の一文がその文と一致したら記録を中断する。 | ||
""" | """ | ||
logger.info( | |||
end_str: Final[str] = input() | f'ここにツイートの本文を入れる!すると・・・・なんとそれを含むツイートで記録を中断する!(これは本当の仕様です。)ちなみに、空欄だと{self.LIMIT_N_TWEETS}件まで終了しない。') | ||
end_str: Final[str] = input() | |||
return end_str | return end_str | ||
def _download_media(self, media_name: | def _download_media( | ||
self, | |||
media_name: str, | |||
accessor: AccessorHandler) -> bool: | |||
"""ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | """ツイートの画像をダウンロードし、:const:`~MEDIA_DIR` に保存する。 | ||
Args: | Args: | ||
media_name | media_name str: 画像ファイル名。Nitter上のimgタグのsrc属性では、``/pic/media%2F`` に後続する。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
832行目: | 1,028行目: | ||
image_bytes: Final[bytes | None] = accessor.request_image(url) | image_bytes: Final[bytes | None] = accessor.request_image(url) | ||
if image_bytes is not None: | if image_bytes is not None: | ||
with open(os.path.join(self.MEDIA_DIR, media_name), | with open(os.path.join(self.MEDIA_DIR, media_name), 'wb') as f: | ||
f.write(image_bytes) | f.write(image_bytes) | ||
return True | return True | ||
838行目: | 1,034行目: | ||
return False | return False | ||
def _tweet_date(self, tweet: | def _tweet_date(self, tweet: Tag) -> datetime: | ||
"""ツイートの時刻を取得する。 | """ツイートの時刻を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
Returns: | Returns: | ||
datetime: ツイートの時刻。 | datetime: ツイートの時刻。 | ||
""" | """ | ||
tweet_date: Tag | NavigableString | None = tweet.find( | |||
date: datetime = datetime.strptime(date_str, '%b %d, %Y · %I:%M %p %Z').replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('Asia/Tokyo')) | class_='tweet-date') | ||
assert isinstance(tweet_date, Tag) | |||
tweet_date_a: Tag | None = tweet_date.a | |||
assert tweet_date_a is not None | |||
date_str: str | list[str] | None = tweet_date_a.get('title') | |||
assert isinstance(date_str, str) | |||
date: datetime = datetime.strptime( | |||
date_str, | |||
'%b %d, %Y · %I:%M %p %Z').replace( | |||
tzinfo=ZoneInfo('UTC')).astimezone( | |||
ZoneInfo('Asia/Tokyo')) | |||
return date | return date | ||
def _get_tweet_media( | |||
def | self, | ||
tweet: Tag, | |||
accessor: AccessorHandler) -> str: | |||
self | |||
"""ツイートの画像や動画を取得する。 | """ツイートの画像や動画を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
881行目: | 1,070行目: | ||
str: Wiki記法でのファイルへのリンクの文字列。 | str: Wiki記法でのファイルへのリンクの文字列。 | ||
""" | """ | ||
tweet_media: | tweet_media: Tag | None = tweet.select_one( | ||
'.tweet-body > .attachments') # 引用リツイート内のメディアを選択しないように.tweet-body直下の.attachmentsを選択 | |||
media_txt: str = '' | media_txt: str = '' | ||
if tweet_media is not None: | if tweet_media is not None: | ||
888行目: | 1,078行目: | ||
for image_a in tweet_media.select('.attachment.image a'): | for image_a in tweet_media.select('.attachment.image a'): | ||
try: | try: | ||
media_name: str = [group for group in re.search(r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', image_a.get('href')).groups() if group][0] | media_name: str = [ | ||
media_list.append(f | group for group in re.search( | ||
r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', | |||
image_a.get('href')).groups() if group][0] | |||
media_list.append(f'[[ファイル:{media_name}|240px]]') | |||
if self._download_media(media_name, accessor): | if self._download_media(media_name, accessor): | ||
logger.info( | |||
os.path.join( | |||
self.MEDIA_DIR, | |||
media_name) | |||
+ ' をアップロードしなければない。') | |||
else: | else: | ||
logger.info( | |||
urljoin( | |||
'https://pbs.twimg.com/media/', | |||
media_name) | |||
+ ' をアップロードしなければない。') | |||
except AttributeError: | except AttributeError: | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | |||
media_list.append( | re.sub( | ||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) | |||
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | |||
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | |||
# ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | # ツイートの動画の取得。ffmpegが入っていなければ手動で取得すること | ||
for i, video_container in enumerate(tweet_media.select('.attachment.video-container')): | for i, video_container in enumerate( | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_media.select('.attachment.video-container')): | ||
tweet_url: str = urljoin( | |||
self.TWITTER_URL, | |||
re.sub( | |||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) # ツイートのURL作成 | |||
video = video_container.select_one('video') | video = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | |||
media_list.append( | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
continue | continue | ||
if subprocess.run(['which', 'ffmpeg'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0: | if subprocess.run(['which', | ||
'ffmpeg'], | |||
media_list.append( | stdout=subprocess.DEVNULL, | ||
else: # ffmpegがある場合 | stderr=subprocess.DEVNULL).returncode != 0: | ||
media_url: str = unquote(re.search(r'[^\/]+$', video.get('data-url')).group(0)) | logger.error(f'ffmpegがないため{tweet_url}の動画が取得できませんでしたを') | ||
media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | |||
else: # ffmpegがある場合 | |||
# TODO: ブロックが大きすぎるので別メソッドに切り出す | |||
media_url: str = unquote( | |||
re.search( | |||
r'[^\/]+$', | |||
video.get('data-url')).group(0)) | |||
tweet_id: str = tweet_url.split('/')[-1] | tweet_id: str = tweet_url.split('/')[-1] | ||
# 動画のダウンロード | # 動画のダウンロード | ||
if accessor.proxies is not None: | if accessor.proxies is not None: | ||
returncode: int = subprocess.run([ | returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-http_proxy', | |||
'accessor.proxies["http"]', '-i', | |||
urljoin(self.NITTER_INSTANCE, media_url), | |||
'-c', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
else: | else: | ||
returncode: int = subprocess.run([ | returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-i', | |||
urljoin(self.NITTER_INSTANCE, media_url), | |||
'-c', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
# 取得成功したらtsをmp4に変換 | # 取得成功したらtsをmp4に変換 | ||
if returncode == 0: | if returncode == 0: | ||
ts2mp4_returncode: int = subprocess.run([ | ts2mp4_returncode: int = subprocess.run( | ||
[ | |||
'ffmpeg', '-y', '-i', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.ts', | |||
'-acodec', 'copy', '-vcodec', 'copy', | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4' | |||
], | |||
stdout=subprocess.DEVNULL).returncode | |||
if ts2mp4_returncode == 0: | if ts2mp4_returncode == 0: | ||
logger.info( | |||
f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.mp4をアップロードしなければない。') | |||
else: | else: | ||
logger.info( | |||
media_list.append(f | f'{os.path.join(self.MEDIA_DIR, tweet_id)}_{i}.tsをmp4に変換してアップロードしなければない。') | ||
media_list.append(f'[[ファイル:{tweet_id}_{i}.mp4|240px]]') | |||
else: | else: | ||
logger.error(f'{tweet_url}の動画が取得できませんでしたを 当職無能') | |||
media_list.append( | media_list.append('[[ファイル:(動画の取得ができませんでした)|240px]]') | ||
media_txt = ' '.join(media_list) | media_txt = ' '.join(media_list) | ||
return media_txt | return media_txt | ||
def _get_tweet_quote(self, tweet: | def _get_tweet_quote( | ||
self, | |||
tweet: Tag, | |||
accessor: AccessorHandler) -> str: | |||
"""引用リツイートの引用元へのリンクを取得する。 | """引用リツイートの引用元へのリンクを取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
942行目: | 1,190行目: | ||
str: Archiveテンプレートでラップされた引用元ツイートへのリンク。 | str: Archiveテンプレートでラップされた引用元ツイートへのリンク。 | ||
""" | """ | ||
tweet_quote: Final[ | tweet_quote: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .quote.quote-big') # 引用リツイートを選択 | |||
quote_txt: str = '' | quote_txt: str = '' | ||
if tweet_quote is not None: | if tweet_quote is not None: | ||
949行目: | 1,198行目: | ||
link = urljoin(self.TWITTER_URL, link) | link = urljoin(self.TWITTER_URL, link) | ||
quote_txt = self._archive_url(link, accessor) | quote_txt = self._archive_url(link, accessor) | ||
tweet_quote_unavailable: Final[ | tweet_quote_unavailable: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .quote.unavailable') # 引用リツイートを選択 | |||
if tweet_quote_unavailable is not None: | if tweet_quote_unavailable is not None: | ||
quote_txt = '(引用元が削除されました)' | quote_txt = '(引用元が削除されました)' | ||
return quote_txt | return quote_txt | ||
def _get_tweet_poll(self, tweet: | def _get_tweet_poll(self, tweet: Tag) -> str: | ||
"""ツイートの投票結果を取得する。 | """ツイートの投票結果を取得する。 | ||
Args: | Args: | ||
tweet | tweet Tag: ツイート内容である ``.timeline-item`` タグを表すbeautifulsoup4タグ。 | ||
Returns: | Returns: | ||
str: Wiki形式に書き直した投票結果。 | str: Wiki形式に書き直した投票結果。 | ||
""" | """ | ||
tweet_poll: Final[ | tweet_poll: Final[Tag | None] = tweet.select_one( | ||
'.tweet-body > .poll') | |||
poll_txt: str = '' | poll_txt: str = '' | ||
if tweet_poll is not None: | if tweet_poll is not None: | ||
poll_meters: Final[ | poll_meters: Final[ResultSet[Tag]] = tweet_poll.select( | ||
'.poll-meter') | |||
for poll_meter in poll_meters: | for poll_meter in poll_meters: | ||
ratio: str = poll_meter.select_one('.poll-choice-value').text | ratio: str = poll_meter.select_one('.poll-choice-value').text | ||
if 'leader' in poll_meter['class']: | if 'leader' in poll_meter['class']: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one( | ||
'.poll-choice-option').text + '</span>' | |||
else: | else: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one('.poll-choice-option').text + '</span>' | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one( | ||
poll_txt += '<br>\n <span style="font-size: small;">' + tweet_poll.select_one('.poll-info').text + '</span>' | '.poll-choice-option').text + '</span>' | ||
poll_txt += '<br>\n <span style="font-size: small;">' + \ | |||
tweet_poll.select_one('.poll-info').text + '</span>' | |||
return poll_txt | return poll_txt | ||
def _get_timeline_items(self, soup: BeautifulSoup) -> list[ | def _get_timeline_items(self, | ||
soup: BeautifulSoup) -> list[Tag]: | |||
"""タイムラインのツイートを取得。 | """タイムラインのツイートを取得。 | ||
985行目: | 1,241行目: | ||
Returns: | Returns: | ||
list[ | list[Tag]: ツイートのアイテムである ``.timeline-item`` タグを表すTagオブジェクトのリスト。 | ||
""" | """ | ||
timeline_item_list: list[ | timeline_item_list: list[Tag] = [] | ||
for item_or_list in soup.select('.timeline > .timeline-item, .timeline > .thread-line'): | for item_or_list in soup.select( | ||
'.timeline > .timeline-item, .timeline > .thread-line'): | |||
if 'unavailable' in item_or_list.attrs['class']: | if 'unavailable' in item_or_list.attrs['class']: | ||
continue | continue | ||
1,007行目: | 1,264行目: | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup( | ||
tweets: Final[list[ | self._page, 'html.parser') # beautifulsoupでレスポンス解析 | ||
for tweet in tweets: | tweets: Final[list[Tag]] = self._get_timeline_items( | ||
if tweet.a.text == self.NEWEST: | soup) # 一ツイートのブロックごとにリストで取得 | ||
for tweet in tweets: # 一ツイート毎に処理 | |||
if tweet.a.text == self.NEWEST: | |||
# Load Newestのボタンは処理しない | |||
continue | continue | ||
if tweet.find(class_='retweet-header') is not None: | if tweet.find(class_='retweet-header') is not None: | ||
# retweet-headerはリツイートを示すので入っていれば処理しない | |||
continue | continue | ||
if tweet.find(class_='pinned') is not None: | if tweet.find(class_='pinned') is not None: | ||
# pinnedは固定ツイートを示すので入っていれば処理しない | |||
continue | continue | ||
if len(self.query_strs) > 0: # クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト | if len(self.query_strs) > 0: | ||
# クエリが指定されている場合、一つでも含まないツイートは処理しない、TODO: 未テスト | |||
not_match: bool = False | not_match: bool = False | ||
for query_str in query_strs: | for query_str in self.query_strs: | ||
if query_str not in tweet.text: | if query_str not in tweet.text: | ||
not_match = True | not_match = True | ||
1,025行目: | 1,288行目: | ||
continue | continue | ||
tweet_url: str = urljoin(self.TWITTER_URL, re.sub('#[^#]*$', '', tweet.find(class_='tweet-link').get('href'))) | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | |||
re.sub( | |||
'#[^#]*$', | |||
'', | |||
tweet.find( | |||
class_='tweet-link').get('href'))) | |||
date: datetime = self._tweet_date(tweet) | date: datetime = self._tweet_date(tweet) | ||
self._table_builder.next_day_if_necessary(date) | |||
archived_tweet_url: str = self._callinshowlink_url( | |||
archived_tweet_url: str = self._callinshowlink_url(tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: | tweet_content: Tag = tweet.find( | ||
self._archive_soup(tweet_content, accessor) | class_='tweet-content media-body') | ||
media_txt: str = self._get_tweet_media(tweet, accessor) | self._archive_soup(tweet_content, accessor) | ||
quote_txt: str = self._get_tweet_quote(tweet, accessor) | media_txt: str = self._get_tweet_media(tweet, accessor) | ||
poll_txt: str = self._get_tweet_poll(tweet) | quote_txt: str = self._get_tweet_quote(tweet, accessor) | ||
self. | poll_txt: str = self._get_tweet_poll(tweet) | ||
self._table_builder.append( | |||
archived_tweet_url, '<br>\n'.join( | |||
filter( | |||
None, [ | |||
self._table_builder.escape_wiki_reserved_words( | |||
tweet_content.get_text()), quote_txt, media_txt, poll_txt]))) | |||
if self._table_builder.count % self.REPORT_INTERVAL == 0: | |||
logger.info( | |||
f'ツイートを{self._table_builder.count}件も記録したンゴwwwwwwwwwww') | |||
if self._stop != '' and self._stop in tweet_content.get_text(): # 目的ツイートか判定 | |||
logger.info('目的ツイート発見でもう尾張屋根') | |||
self._table_builder.dump_file() | |||
if self._table_builder.count >= self.LIMIT_N_TWEETS: | |||
logger.info(f'{self.LIMIT_N_TWEETS}件も記録している。もうやめにしませんか。') | |||
self._table_builder.dump_file() | |||
def _archive_soup(self, tag: | def _archive_soup( | ||
self, | |||
tag: Tag, | |||
accessor: AccessorHandler) -> None: | |||
"""ツイート内のaタグをテンプレートArchiveの文字列に変化させる。 | """ツイート内のaタグをテンプレートArchiveの文字列に変化させる。 | ||
1,105行目: | 1,331行目: | ||
Args: | Args: | ||
tag | tag Tag: ツイート内容の本文である ``.media-body`` タグを表すbeautifulsoup4タグ。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
urls_in_tweet: Final[ | urls_in_tweet: Final[ResultSet[Tag]] = tag.find_all( | ||
'a') | |||
for url in urls_in_tweet: | for url in urls_in_tweet: | ||
href: str | list[str] | None = url.get('href') | |||
if | assert isinstance(href, str) | ||
#Nitter上のTwitterへのリンクを直す | |||
url_link: str = | if href.startswith('https://') or href.startswith('http://'): | ||
url_link = re.sub('\?.*$', '', url_link) | # 先頭にhttpが付いていない物はハッシュタグの検索ページへのリンクなので処理しない | ||
url.replace_with(self._archive_url(url_link, accessor)) | if href.startswith('https' + self.NITTER_INSTANCE[4:]): | ||
elif | # Nitter上のTwitterへのリンクを直す | ||
#Nitter上のTwitterへのリンクを直す | url_link: str = href.replace( | ||
url_link: str = | 'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | ||
url_link = re.sub('\?.*$', '', url_link) | url_link = re.sub('\\?.*$', '', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif self._invidious_pattern.search( | elif href.startswith('https://nitter.kavin.rocks/'): | ||
#Nitter上のYouTubeへのリンクをInvidiousのものから直す | # Nitter上のTwitterへのリンクを直す | ||
url_link: str = href.replace( | |||
if re.match('https://[^/]+/[^/]+/', | 'https://nitter.kavin.rocks/', self.TWITTER_URL) | ||
url_link = re.sub('\\?.*$', '', url_link) | |||
url.replace_with(self._archive_url(url_link, accessor)) | |||
elif self._invidious_pattern.search(href): | |||
# Nitter上のYouTubeへのリンクをInvidiousのものから直す | |||
if re.match( | |||
'https://[^/]+/[^/]+/', | |||
href) or re.search( | |||
'/@[^/]*$', | |||
href): | |||
href = self._invidious_pattern.sub('youtube.com', href) | |||
else: | else: | ||
href = self._invidious_pattern.sub('youtu.be', href) | |||
url.replace_with(self._archive_url( | url.replace_with(self._archive_url(href, accessor)) | ||
elif | elif href.startswith('https://bibliogram.art/'): | ||
# Nitter上のInstagramへのリンクをBibliogramのものから直す | # Nitter上のInstagramへのリンクをBibliogramのものから直す | ||
# Bibliogramは中止されたようなのでそのうちリンクが変わるかも | # Bibliogramは中止されたようなのでそのうちリンクが変わるかも | ||
url_link: str = | url_link: str = href.replace( | ||
url.replace_with(self._archive_url(url_link, accessor)) | 'https://bibliogram.art/', | ||
'https://www.instagram.com/') | |||
url.replace_with(self._archive_url(url_link, accessor)) | |||
else: | else: | ||
url.replace_with(self._archive_url( | url.replace_with(self._archive_url(href, accessor)) | ||
elif url.text.startswith('@'): | elif url.text.startswith('@'): | ||
url_link: str = urljoin(self.TWITTER_URL, | url_link: str = urljoin(self.TWITTER_URL, href) | ||
url_text: str = url.text | url_text: str = url.text | ||
url.replace_with(self._archive_url(url_link, accessor, url_text)) | url.replace_with( | ||
self._archive_url( | |||
url_link, accessor, url_text)) | |||
def _archive_url(self, url: | def _archive_url( | ||
self, | |||
url: str, | |||
accessor: AccessorHandler, | |||
text: str | None = None) -> str: | |||
"""URLをArchiveテンプレートでラップする。 | """URLをArchiveテンプレートでラップする。 | ||
1,147行目: | 1,391行目: | ||
Args: | Args: | ||
url | url str: ラップするURL。 | ||
accessor AccessorHandler: | accessor AccessorHandler: アクセスハンドラ。 | ||
text | text str|None: ArchiveテンプレートでURLの代わりに表示する文字列。 | ||
Returns: | Returns: | ||
str: ArchiveタグでラップしたURL。 | str: ArchiveタグでラップしたURL。 | ||
""" | """ | ||
if '#' in url: # フラグメント識別子の処理 | if '#' in url: # フラグメント識別子の処理 | ||
main_url, fragment = url.split('#', maxsplit=1) | main_url, fragment = url.split('#', maxsplit=1) | ||
return self._table_builder.archive_url( | |||
url, self._archive(main_url, accessor) + '#' + fragment, text) | |||
else: | else: | ||
return self._table_builder.archive_url( | |||
url, self._archive(url, accessor), text) | |||
def _callinshowlink_url(self, url: | def _callinshowlink_url(self, url: str, accessor: AccessorHandler) -> str: | ||
"""URLをCallinShowLinkテンプレートでラップする。 | """URLをCallinShowLinkテンプレートでラップする。 | ||
Args: | Args: | ||
url | url str: ラップするURL。 | ||
accessor AccessorHandler: | accessor AccessorHandler: アクセスハンドラ。 | ||
Returns: | Returns: | ||
str: CallinShowLinkタグでラップしたURL。 | str: CallinShowLinkタグでラップしたURL。 | ||
""" | """ | ||
return | return self._table_builder.callinshowlink_url( | ||
url, self._archive(url, accessor)) | |||
def _archive(self, url: | def _archive(self, url: str, accessor: AccessorHandler) -> str: | ||
"""URLから対応するarchive.todayのURLを返す。 | """URLから対応するarchive.todayのURLを返す。 | ||
1,186行目: | 1,427行目: | ||
Args: | Args: | ||
url | url str: 魚拓を取得するURL。 | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
1,192行目: | 1,433行目: | ||
str: 魚拓のURL。 | str: 魚拓のURL。 | ||
""" | """ | ||
archive_url: str = urljoin(self.ARCHIVE_TODAY_STANDARD, quote(unquote(url), safe='&=+?%')) | archive_url: str = urljoin( | ||
res: Final[str | None] = accessor.request(urljoin(self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) | self.ARCHIVE_TODAY_STANDARD, | ||
if res is None: | quote( | ||
unquote(url), | |||
safe='&=+?%')) # wikiに載せるとき用URLで失敗するとこのままhttps://archive.ph/https%3A%2F%2Fxxxxxxxxの形で返される | |||
res: Final[str | None] = accessor.request(urljoin( | |||
self.ARCHIVE_TODAY, quote(unquote(url), safe='&=+?%'))) | |||
if res is None: # 魚拓接続失敗時処理 | |||
logger.error(archive_url + 'にアクセス失敗ナリ。出力されるテキストにはそのまま記載されるナリ。') | |||
else: | else: | ||
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
content: | content: Tag = soup.find( | ||
if content is None or content.get_text()[:len(self.NO_ARCHIVE)] == self.NO_ARCHIVE: | id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得 | ||
if content is None or content.get_text()[:len( | |||
self.NO_ARCHIVE)] == self.NO_ARCHIVE: # 魚拓があるかないか判定 | |||
logger.warning(url + 'の魚拓がない。これはいけない。') | |||
else: | else: | ||
archive_url = content.find('a').get('href').replace(self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | archive_url = content.find('a').get('href').replace( | ||
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | |||
return archive_url | return archive_url | ||
1,213行目: | 1,462行目: | ||
accessor AccessorHandler: アクセスハンドラ | accessor AccessorHandler: アクセスハンドラ | ||
""" | """ | ||
soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(self._page, 'html.parser') | ||
show_mores: Final[ | show_mores: Final[ResultSet[Tag] | ||
new_url: str = '' # ここで定義しないと動かなくなった、FIXME? | ] = soup.find_all(class_='show-more') | ||
for show_more in show_mores: | new_url: str = '' # ここで定義しないと動かなくなった、FIXME? | ||
if show_more.text != self.NEWEST: | for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | ||
new_url = urljoin(self.NITTER_INSTANCE, self. | if show_more.text != self.NEWEST: # 前ページへのリンクではないか判定 | ||
res: Final[str | None] = accessor.request(new_url) | new_url = urljoin( | ||
self.NITTER_INSTANCE, | |||
self._name | |||
+ '/' | |||
+ self.TWEETS_OR_REPLIES | |||
+ show_more.a.get('href')) # 直下のaタグのhrefの中身取ってURL頭部分と合体 | |||
res: Final[str | None] = accessor.request(new_url) | |||
if res is None: | if res is None: | ||
self._fail() | self._fail() | ||
new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | new_page_soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
if new_page_soup.find(class_= | if new_page_soup.find( | ||
class_='timeline-end') is None: | |||
self._page = res | # ツイートの終端ではtimeline-endだけのページになるので判定 | ||
logger.info(new_url + 'に移動しますを') | |||
self._page = res # まだ残りツイートがあるのでページを返して再度ツイート本文収集 | |||
else: | else: | ||
logger.info('急に残りツイートが無くなったな終了するか') | |||
self. | self._table_builder.dump_file() | ||
def execute(self, krsw: bool=False, use_browser: bool=True, enable_javascript: bool=True) -> NoReturn: | def execute(self, krsw: bool = False, use_browser: bool = True, | ||
enable_javascript: bool = True) -> NoReturn: | |||
"""通信が必要な部分のロジック。 | """通信が必要な部分のロジック。 | ||
1,243行目: | 1,501行目: | ||
self._check_nitter_instance(accessor) | self._check_nitter_instance(accessor) | ||
self._check_archive_instance(accessor) | self._check_archive_instance(accessor) | ||
# Invidiousのインスタンスリストの正規表現パターンを取得 | |||
invidious_url_tuple: Final[tuple[str]] = self._invidious_instances(accessor) | invidious_url_tuple: Final[tuple[str, ...] | ||
self._invidious_pattern: | ] = self._invidious_instances(accessor) | ||
self._invidious_pattern: re.Pattern[str] = re.compile( | |||
'|'.join(invidious_url_tuple)) | |||
# 検索クエリの設定 | # 検索クエリの設定 | ||
1,257行目: | 1,517行目: | ||
if __name__ == '__main__': | if __name__ == '__main__': | ||
if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < | if sys.version_info.major < 3 or ( | ||
sys.version_info.major == 3 and sys.version_info.minor < 11): | |||
logger.critical('Pythonのバージョンを3.11以上に上げて下さい') | |||
exit(1) | exit(1) | ||
parser: ArgumentParser = ArgumentParser() | parser: ArgumentParser = ArgumentParser() | ||
parser.add_argument( | parser.add_argument( | ||
parser.add_argument( | '--krsw', | ||
parser.add_argument( | action='store_true', | ||
help='指定すると、パカデブのツイートを取得上限数まで取得する。') | |||
parser.add_argument( | |||
'-n', | |||
'--no_browser', | |||
action='store_true', | |||
help='指定すると、Tor Browserを利用しない。') | |||
parser.add_argument( | |||
'-d', | |||
'--disable_script', | |||
action='store_true', | |||
help='指定すると、Tor BrowserでJavaScriptを利用しない。') | |||
args: Namespace = parser.parse_args() | args: Namespace = parser.parse_args() | ||
twitter_archiver: TwitterArchiver = TwitterArchiver() | twitter_archiver: TwitterArchiver = TwitterArchiver() | ||
twitter_archiver.execute(args.krsw, not args. | twitter_archiver.execute( | ||
args.krsw, | |||
not args.no_browser, | |||
not args.disable_script) | |||
</syntaxhighlight> | </syntaxhighlight> | ||