→コード: v4.1.1 --search-unarchivedでリツイートを除く処理を追加
>Fet-Fe (→コード: v4.1.0 wikiに掲載されていないツイートのURLをarchive.isから取得する機能を追加) |
>Fet-Fe (→コード: v4.1.1 --search-unarchivedでリツイートを除く処理を追加) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.1. | ver4.1.1 2023/10/22恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
22行目: | 22行目: | ||
$ python3 (ファイル名) --krsw | $ python3 (ファイル名) --krsw | ||
自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | 自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。 | ||
つまりユーザー入力が要りません。 | つまりユーザー入力が要りません。 | ||
``--no-browser`` オプションでTor Browserを使用しないモードに、 | |||
``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。 | |||
``--search-unarchived`` オプションでは、`archive.today <https://archive.today>`_ から | |||
Wikiに未掲載のツイートのURLを収集するモードになります。 | |||
Note: | Note: | ||
36行目: | 35行目: | ||
* 環境は玉葱前提です。 | * 環境は玉葱前提です。 | ||
* | * TailsやWhonixでない場合、Tor Browserを入れておくか、torコマンドでプロキシを立てておくことが必要です。 | ||
* Whonix-Workstation, MacOSで動作確認済 | * Whonix-Workstation, MacOSで動作確認済 | ||
68行目: | 67行目: | ||
from enum import Enum | from enum import Enum | ||
from logging import Logger, getLogger | from logging import Logger, getLogger | ||
from re import Match | from re import Match, Pattern | ||
from time import sleep | from time import sleep | ||
from types import MappingProxyType, TracebackType | from types import MappingProxyType, TracebackType | ||
79行目: | 78行目: | ||
from bs4.element import NavigableString, ResultSet, Tag | from bs4.element import NavigableString, ResultSet, Tag | ||
from selenium import webdriver | from selenium import webdriver | ||
from selenium.common.exceptions import | from selenium.common.exceptions import WebDriverException | ||
from selenium.webdriver.common.by import By | from selenium.webdriver.common.by import By | ||
from selenium.webdriver.firefox.options import Options as FirefoxOptions | from selenium.webdriver.firefox.options import Options as FirefoxOptions | ||
336行目: | 334行目: | ||
enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。 | ||
""" | """ | ||
self. | self._options: Final[FirefoxOptions] = FirefoxOptions() | ||
self._options.binary_location = ( | |||
self.TOR_BROWSER_PATHS[platform.system()] | |||
) | |||
if enable_javascript: | if enable_javascript: | ||
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | ||
self._options.preferences.update({ # type: ignore | |||
'javascript.enabled': enable_javascript, | 'javascript.enabled': enable_javascript, | ||
'intl.accept_languages': 'en-US, en', | 'intl.accept_languages': 'en-US, en', | ||
351行目: | 349行目: | ||
'dom.webdriver.enabled': False # 自動操縦と見破られないための設定 | 'dom.webdriver.enabled': False # 自動操縦と見破られないための設定 | ||
}) | }) | ||
self._refresh_browser() | |||
def quit(self) -> None: | |||
"""Seleniumドライバを終了する。 | |||
""" | |||
if hasattr(self, '_driver'): | |||
logger.debug('ブラウザ終了') | |||
self._driver.quit() | |||
def _refresh_browser(self) -> None: | |||
"""ブラウザを起動する。 | |||
""" | |||
try: | try: | ||
self._driver: | logger.debug('ブラウザ起動') | ||
options= | self._driver: webdriver.Firefox = webdriver.Firefox( | ||
options=self._options) | |||
sleep(1) | sleep(1) | ||
wait_init: Final[WebDriverWait] = WebDriverWait( | wait_init: Final[WebDriverWait] = WebDriverWait( | ||
368行目: | 378行目: | ||
self.quit() | self.quit() | ||
raise | raise | ||
def _check_recaptcha(self, url: str) -> None: | def _check_recaptcha(self, url: str) -> None: | ||
388行目: | 392行目: | ||
botバレしたときに自動で他のTorサーキットに接続し直す。 | botバレしたときに自動で他のTorサーキットに接続し直す。 | ||
""" | """ | ||
if len(self._driver.find_elements(By.ID, 'g-recaptcha')) > 0: | |||
if self._options.preferences.get('javascript.enabled'): # type: ignore | |||
if self. | |||
logger.warning(f'{url} でreCAPTCHAが要求されたナリ') | logger.warning(f'{url} でreCAPTCHAが要求されたナリ') | ||
print('reCAPTCHAを解いてね(笑)、それはできるよね。') | print('reCAPTCHAを解いてね(笑)、それはできるよね。') | ||
399行目: | 402行目: | ||
ec.staleness_of( | ec.staleness_of( | ||
self._driver.find_element(By.ID, 'g-recaptcha'))) | self._driver.find_element(By.ID, 'g-recaptcha'))) | ||
self._random_sleep() # DoS対策で待つ | |||
self._driver.get(url) | |||
else: | else: | ||
raise ReCaptchaRequiredError( | raise ReCaptchaRequiredError( | ||
f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}') | f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}') | ||
def get(self, url: str) -> str: | def get(self, url: str) -> str: | ||
691行目: | 690行目: | ||
return '{|class="wikitable" style="text-align: left;"\n' + text + '|}' | return '{|class="wikitable" style="text-align: left;"\n' + text + '|}' | ||
@ | @classmethod | ||
def escape_wiki_reserved_words(text: str) -> str: | def escape_wiki_reserved_words(cls, text: str) -> str: | ||
"""MediaWikiの文法と衝突する文字を無効化する。 | """MediaWikiの文法と衝突する文字を無効化する。 | ||
735行目: | 734行目: | ||
return text | return text | ||
if not hasattr(cls, '_escape_callables'): | |||
# 初回呼び出しの時だけ正規表現をコンパイルする | |||
patterns: tuple[Pattern[str], ...] = ( | |||
re.compile(r'^ ', re.MULTILINE), | |||
re.compile(r'^([\*#:;])', re.MULTILINE), | |||
re.compile(r'^----', re.MULTILINE) | |||
) | |||
cls._escape_callables: tuple[Callable[[str], str], ...] = ( | |||
lambda t: t.replace('\n', '<br>\n'), | |||
lambda t: patterns[0].sub(' ', t), | |||
lambda t: patterns[1].sub(r'<nowiki>\1</nowiki>', t), | |||
lambda t: patterns[2].sub('<nowiki>----</nowiki>', t) | |||
) | |||
for escape_callable in cls._escape_callables: | |||
text = escape_callable(text) | |||
text = escape_nolink_urls(text) | text = escape_nolink_urls(text) | ||
return text | return text | ||
904行目: | 908行目: | ||
self._check_slash() # スラッシュが抜けてないかチェック | self._check_slash() # スラッシュが抜けてないかチェック | ||
self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | self._has_ffmpeg: Final[bool] = self._check_ffmpeg() # ffmpegがあるかチェック | ||
self._img_ext_pattern: Final[Pattern[str]] = re.compile( | |||
r'%2F([^%]*\.(?:jpg|jpeg|png|gif))') | |||
self._url_fragment_pattern: Final[Pattern[str]] = re.compile('#[^#]*$') | |||
self._url_query_pattern: Final[Pattern[str]] = re.compile(r'\?.*$') | |||
def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool: | def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool: | ||
1,253行目: | 1,262行目: | ||
href: str | list[str] | None = image_a.get('href') | href: str | list[str] | None = image_a.get('href') | ||
assert isinstance(href, str) | assert isinstance(href, str) | ||
matched: Match[str] | None = | matched: Match[str] | None = self._img_ext_pattern.search( | ||
href) | href) | ||
assert matched is not None | assert matched is not None | ||
1,341行目: | 1,349行目: | ||
link: str | list[str] | None = quote_link.get('href') | link: str | list[str] | None = quote_link.get('href') | ||
assert isinstance(link, str) | assert isinstance(link, str) | ||
link = | link = self._url_fragment_pattern.sub('', link) | ||
link = urljoin(self.TWITTER_URL, link) | link = urljoin(self.TWITTER_URL, link) | ||
quote_txt = self._archive_url(link, accessor) | quote_txt = self._archive_url(link, accessor) | ||
1,444行目: | 1,452行目: | ||
url_link: str = href.replace( | url_link: str = href.replace( | ||
'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | 'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL) | ||
url_link = | url_link = self._url_query_pattern.sub('', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif href.startswith('https://nitter.kavin.rocks/'): | elif href.startswith('https://nitter.kavin.rocks/'): | ||
1,450行目: | 1,458行目: | ||
url_link: str = href.replace( | url_link: str = href.replace( | ||
'https://nitter.kavin.rocks/', self.TWITTER_URL) | 'https://nitter.kavin.rocks/', self.TWITTER_URL) | ||
url_link = | url_link = self._url_query_pattern.sub('', url_link) | ||
url.replace_with(self._archive_url(url_link, accessor)) | url.replace_with(self._archive_url(url_link, accessor)) | ||
elif (hasattr(self, '_invidious_pattern') | elif (hasattr(self, '_invidious_pattern') | ||
1,607行目: | 1,615行目: | ||
tweet_url: Final[str] = urljoin( | tweet_url: Final[str] = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
self._url_fragment_pattern.sub('', href)) | |||
tweet_callinshow_template: Final[str] = self._callinshowlink_url( | tweet_callinshow_template: Final[str] = self._callinshowlink_url( | ||
1,783行目: | 1,791行目: | ||
'ユーザー名: @' + self._name + 'で検索しまふ' | 'ユーザー名: @' + self._name + 'で検索しまふ' | ||
) | ) | ||
self._twitter_url_pattern: Pattern[str] = re.compile( | |||
self.TWITTER_URL + self._name + r'/status/\d+') | |||
self._url_list_on_wiki: list[str] = [] | self._url_list_on_wiki: list[str] = [] | ||
1,842行目: | 1,853行目: | ||
a_last_child: Tag | None = tweet.select_one('a:last-child') | a_last_child: Tag | None = tweet.select_one('a:last-child') | ||
assert a_last_child is not None | assert a_last_child is not None | ||
url_matched: Final[Match[str]] | None = | url_matched: Final[Match[str]] | None = ( | ||
self. | self._twitter_url_pattern.match(a_last_child.text) | ||
) | |||
if url_matched is not None: | if url_matched is not None: | ||
a_first_child: Tag | None = tweet.select_one('a:first-child') | a_first_child: Tag | None = tweet.select_one('a:first-child') | ||
1,913行目: | 1,924行目: | ||
Examples: | Examples: | ||
`https://twitter.com/CallinShow/status/1707` で始まるURLをすべて探索する場合 | `https://twitter.com/CallinShow/status/1707` で始まるURLをすべて探索する場合 | ||
:: | :: | ||
1,920行目: | 1,930行目: | ||
`https://twitter.com/CallinShow/status/165` で始まるURLから | `https://twitter.com/CallinShow/status/165` で始まるURLから | ||
`https://twitter.com/CallinShow/status/169` で始まるURLまでをすべて探索する場合 | `https://twitter.com/CallinShow/status/169` で始まるURLまでをすべて探索する場合 | ||
:: | :: | ||
1,974行目: | 1,983行目: | ||
'%Y-%m-%dT%H:%M:%SZ') | '%Y-%m-%dT%H:%M:%SZ') | ||
return raw_time.replace(tzinfo=ZoneInfo('Asia/Tokyo')) | return raw_time.replace(tzinfo=ZoneInfo('Asia/Tokyo')) | ||
def _filter_out_retweets( | |||
self, | |||
url_pairs: list[UrlTuple], | |||
accessor: AccessorHandler) -> list[UrlTuple]: | |||
"""リツイートを除く。 | |||
Args: | |||
url_pairs (list[UrlTuple]): リツイートを除く前のURLのリスト。 | |||
accessor (AccessorHandler): アクセスハンドラ。 | |||
Returns: | |||
list[UrlTuple]: リツイートを除いたURLのリスト。 | |||
""" | |||
filtered_urls: list[UrlTuple] = [] | |||
for url_pair in url_pairs: | |||
page: str | None = accessor.request(url_pair.archive_url) | |||
assert page is not None | |||
soup: BeautifulSoup = BeautifulSoup(page) | |||
if soup.select_one('span[data-testid="socialContext"]') is None: | |||
filtered_urls.append(url_pair) | |||
return filtered_urls | |||
@override | @override | ||
1,993行目: | 2,026行目: | ||
self.TWEET_URL_PREFIX_DEFAULT, | self.TWEET_URL_PREFIX_DEFAULT, | ||
self.INCREMENTED_NUM_DEFAULT) | self.INCREMENTED_NUM_DEFAULT) | ||
# リツイートを除く | |||
filtered_url_list: list[UrlTuple] = self._filter_out_retweets( | |||
self._url_list, accessor) | |||
with codecs.open(self.FILENAME, 'w', 'utf-8') as f: | with codecs.open(self.FILENAME, 'w', 'utf-8') as f: | ||
for url_pair in | for url_pair in filtered_url_list: | ||
f.write(url_pair.url + '\n') | f.write(url_pair.url + '\n') | ||
logger.info('テキストファイル手に入ったやで〜') | logger.info('テキストファイル手に入ったやで〜') | ||
2,012行目: | 2,049行目: | ||
parser.add_argument( | parser.add_argument( | ||
'-n', | '-n', | ||
'-- | '--no-browser', | ||
action='store_true', | action='store_true', | ||
help='指定すると、Tor Browserを利用しない。') | help='指定すると、Tor Browserを利用しない。') | ||
parser.add_argument( | parser.add_argument( | ||
'-d', | '-d', | ||
'-- | '--disable-script', | ||
action='store_true', | action='store_true', | ||
help='指定すると、Tor BrowserでJavaScriptを利用しない。') | help='指定すると、Tor BrowserでJavaScriptを利用しない。') | ||
parser.add_argument( | parser.add_argument( | ||
'-u', | '-u', | ||
'-- | '--search-unarchived', | ||
action='store_true', | action='store_true', | ||
help=('指定すると、Wikiに未掲載のツイートのURLをarchive.todayから収集する。' | help=('指定すると、Wikiに未掲載のツイートのURLをarchive.todayから収集する。' | ||
2,029行目: | 2,066行目: | ||
logger.debug('args: ' + str(args)) | logger.debug('args: ' + str(args)) | ||
twitter_archiver: Final[TwitterArchiver] = ArchiveCrawler() if | twitter_archiver: Final[TwitterArchiver] = ( | ||
ArchiveCrawler() if args.search_unarchived else TwitterArchiver() | |||
) | |||
twitter_archiver.execute( | twitter_archiver.execute( | ||
args.krsw, | args.krsw, |