→コード: v4.0.4 SeleniumAccessor.quit()がself._driverの未定義時にエラーを吐かないよう修正
>Fet-Fe (→コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す) |
>Fet-Fe (→コード: v4.0.4 SeleniumAccessor.quit()がself._driverの未定義時にエラーを吐かないよう修正) |
||
7行目: | 7行目: | ||
"""Twitter自動収集スクリプト | """Twitter自動収集スクリプト | ||
ver4.0. | ver4.0.4 2023/9/29恒心 | ||
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | 当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です | ||
60行目: | 60行目: | ||
from datetime import datetime | from datetime import datetime | ||
from logging import Logger, getLogger | from logging import Logger, getLogger | ||
from re import Match | |||
from time import sleep | from time import sleep | ||
from types import MappingProxyType, TracebackType | from types import MappingProxyType, TracebackType | ||
154行目: | 155行目: | ||
""" | """ | ||
def __init__(self): | def __init__(self) -> None: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
316行目: | 317行目: | ||
""" | """ | ||
def __init__(self, enable_javascript: bool): | def __init__(self, enable_javascript: bool) -> None: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
332行目: | 333行目: | ||
logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを') | ||
options.preferences.update({ | options.preferences.update({ # type: ignore | ||
'javascript.enabled': enable_javascript, | 'javascript.enabled': enable_javascript, | ||
'intl.accept_languages': 'en-US, en', | 'intl.accept_languages': 'en-US, en', | ||
341行目: | 342行目: | ||
try: | try: | ||
self. | self._driver: webdriver.Firefox = webdriver.Firefox( | ||
options=options) | |||
sleep(1) | sleep(1) | ||
wait_init: WebDriverWait = WebDriverWait(self. | wait_init: WebDriverWait = WebDriverWait(self._driver, | ||
self.WAIT_TIME_FOR_INIT) | self.WAIT_TIME_FOR_INIT) | ||
wait_init.until( | wait_init.until( # type: ignore | ||
ec.element_to_be_clickable((By.ID, 'connectButton')) | ec.element_to_be_clickable((By.ID, 'connectButton')) | ||
) | ) | ||
self. | self._driver.find_element(By.ID, 'connectButton').click() | ||
wait_init.until(ec.url_contains('about:blank')) # | # Torの接続が完了するまで待つ | ||
wait_init.until(ec.url_contains('about:blank')) # type: ignore | |||
self.wait: WebDriverWait = WebDriverWait(self. | self.wait: WebDriverWait = WebDriverWait(self._driver, | ||
self.REQUEST_TIMEOUT) | self.REQUEST_TIMEOUT) | ||
except BaseException: | except BaseException: | ||
360行目: | 363行目: | ||
"""Seleniumドライバを終了する。 | """Seleniumドライバを終了する。 | ||
""" | """ | ||
if self | if hasattr(self, 'driver'): | ||
self. | self._driver.quit() | ||
def _check_recaptcha(self) -> None: | def _check_recaptcha(self) -> None: | ||
373行目: | 376行目: | ||
""" | """ | ||
try: | try: | ||
self. | self._driver.find_element( # 要素がない時に例外を吐く | ||
By.CSS_SELECTOR, | By.CSS_SELECTOR, | ||
'script[src^="https://www.google.com/recaptcha/api.js"]') | 'script[src^="https://www.google.com/recaptcha/api.js"]') | ||
380行目: | 383行目: | ||
logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ') | logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ') | ||
WebDriverWait( | WebDriverWait( | ||
self. | self._driver, | ||
10000).until( | 10000).until( # type: ignore | ||
ec.staleness_of( | ec.staleness_of( | ||
self. | self._driver.find_element( | ||
By.CSS_SELECTOR, | By.CSS_SELECTOR, | ||
'script[src^="https://www.google.com/recaptcha/api.js"]'))) | 'script[src^="https://www.google.com/recaptcha/api.js"]'))) | ||
390行目: | 393行目: | ||
raise ReCaptchaRequiredError( | raise ReCaptchaRequiredError( | ||
'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: ' | 'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: ' | ||
+ self. | + self._driver.current_url) | ||
except NoSuchElementException: | except NoSuchElementException: | ||
# reCAPTCHAの要素がなければそのまま | # reCAPTCHAの要素がなければそのまま | ||
406行目: | 409行目: | ||
self._random_sleep() # DoS対策で待つ | self._random_sleep() # DoS対策で待つ | ||
try: | try: | ||
self. | self._driver.get(url) | ||
self._check_recaptcha() | self._check_recaptcha() | ||
except WebDriverException as e: | except WebDriverException as e: | ||
# Selenium固有の例外を共通の例外に変換 | # Selenium固有の例外を共通の例外に変換 | ||
raise AccessError(str(e)) from e | raise AccessError(str(e)) from e | ||
return self. | return self._driver.page_source | ||
428行目: | 431行目: | ||
""" | """ | ||
def __init__(self, use_browser: bool, enable_javascript: bool): | def __init__(self, use_browser: bool, enable_javascript: bool) -> None: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
576行目: | 579行目: | ||
class TableBuilder: | class TableBuilder: | ||
def __init__(self, date: datetime): | def __init__(self, date: datetime) -> None: | ||
self._tables: list[str] = [''] | self._tables: list[str] = [''] | ||
self._count: int = 0 # 記録数 | self._count: int = 0 # 記録数 | ||
613行目: | 616行目: | ||
self._tables[0] = self._convert_to_text_table(self._tables[0]) | self._tables[0] = self._convert_to_text_table(self._tables[0]) | ||
if os.name == 'nt': # Windows | if os.name == 'nt': # Windows | ||
self. | self._tables[0] = self._date.strftime( | ||
'\n=== %#m月%#d日 ===\n') + self. | '\n=== %#m月%#d日 ===\n') + self._tables[0] | ||
logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを')) | logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを')) | ||
else: # Mac or Linux | else: # Mac or Linux | ||
625行目: | 628行目: | ||
def _convert_to_text_table(self, text: str) -> str: | def _convert_to_text_table(self, text: str) -> str: | ||
"""``self. | """``self._tables[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。 | ||
Args: | Args: | ||
662行目: | 665行目: | ||
i += 2 | i += 2 | ||
else: | else: | ||
if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|': | if (text[i:i + 10] == '{{Archive|' | ||
or text[i:i + 10] == '{{archive|'): | |||
is_in_archive_template = True | is_in_archive_template = True | ||
i += 10 | i += 10 | ||
807行目: | 811行目: | ||
""" | """ | ||
def __init__(self): | def __init__(self) -> None: | ||
"""コンストラクタ。 | """コンストラクタ。 | ||
""" | """ | ||
1,050行目: | 1,054行目: | ||
date_str: str | list[str] | None = tweet_date_a.get('title') | date_str: str | list[str] | None = tweet_date_a.get('title') | ||
assert isinstance(date_str, str) | assert isinstance(date_str, str) | ||
return datetime.strptime( | |||
date_str, | date_str, | ||
'%b %d, %Y · %I:%M %p %Z').replace( | '%b %d, %Y · %I:%M %p %Z').replace( | ||
tzinfo=ZoneInfo('UTC')).astimezone( | tzinfo=ZoneInfo('UTC')).astimezone( | ||
ZoneInfo('Asia/Tokyo')) | ZoneInfo('Asia/Tokyo')) | ||
def _get_tweet_media( | def _get_tweet_media( | ||
1,078行目: | 1,081行目: | ||
for image_a in tweet_media.select('.attachment.image a'): | for image_a in tweet_media.select('.attachment.image a'): | ||
try: | try: | ||
href: str | list[str] | None = image_a.get('href') | |||
assert isinstance(href, str) | |||
media_name: str = [ | media_name: str = [ | ||
group for group in re.search( | group for group in re.search( | ||
r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', | r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)', | ||
href).groups() if group][0] | |||
media_list.append(f'[[ファイル:{media_name}|240px]]') | media_list.append(f'[[ファイル:{media_name}|240px]]') | ||
if self._download_media(media_name, accessor): | if self._download_media(media_name, accessor): | ||
1,096行目: | 1,101行目: | ||
+ ' をアップロードしなければない。') | + ' をアップロードしなければない。') | ||
except AttributeError: | except AttributeError: | ||
tweet_link: Tag | NavigableString | None = tweet.find( | |||
class_='tweet-link') | |||
assert isinstance(tweet_link, Tag) | |||
href: str | list[str] | None = tweet_link.get('href') | |||
assert isinstance(href, str) | |||
tweet_url: str = urljoin( | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
re.sub( | re.sub('#[^#]*$', '', href)) | ||
logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能') | ||
media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]') | ||
1,108行目: | 1,114行目: | ||
for i, video_container in enumerate( | for i, video_container in enumerate( | ||
tweet_media.select('.attachment.video-container')): | tweet_media.select('.attachment.video-container')): | ||
tweet_link: Tag | NavigableString | None = tweet.find( | |||
class_='tweet-link') | |||
assert isinstance(tweet_link, Tag) | |||
href: str | list[str] | None = tweet_link.get('href') | |||
assert isinstance(href, str) | |||
tweet_url: str = urljoin( | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
re.sub( | re.sub('#[^#]*$', '', href)) # ツイートのURL作成 | ||
video = video_container.select_one('video') | video = video_container.select_one('video') | ||
if video is None: | if video is None: | ||
1,129行目: | 1,136行目: | ||
else: # ffmpegがある場合 | else: # ffmpegがある場合 | ||
# TODO: ブロックが大きすぎるので別メソッドに切り出す | # TODO: ブロックが大きすぎるので別メソッドに切り出す | ||
data_url: str | list[str] | None = video.get('data-url') | |||
assert isinstance(data_url, str) | |||
matched: Match[str] | None = re.search( | |||
r'[^\/]+$', data_url) | |||
assert matched is not None | |||
media_url: str = unquote(matched.group(0)) | |||
tweet_id: str = tweet_url.split('/')[-1] | tweet_id: str = tweet_url.split('/')[-1] | ||
# 動画のダウンロード | # 動画のダウンロード | ||
1,194行目: | 1,203行目: | ||
quote_txt: str = '' | quote_txt: str = '' | ||
if tweet_quote is not None: | if tweet_quote is not None: | ||
quote_link: Tag | None = tweet_quote.select_one('.quote-link') | |||
assert quote_link is not None | |||
link: str | list[str] | None = quote_link.get('href') | |||
assert isinstance(link, str) | |||
link = re.sub('#.*$', '', link) | link = re.sub('#.*$', '', link) | ||
link = urljoin(self.TWITTER_URL, link) | link = urljoin(self.TWITTER_URL, link) | ||
1,219行目: | 1,231行目: | ||
poll_meters: Final[ResultSet[Tag]] = tweet_poll.select( | poll_meters: Final[ResultSet[Tag]] = tweet_poll.select( | ||
'.poll-meter') | '.poll-meter') | ||
poll_info: Tag | None = tweet_poll.select_one('.poll-info') | |||
assert poll_info is not None | |||
for poll_meter in poll_meters: | for poll_meter in poll_meters: | ||
poll_choice_value: Tag | None = poll_meter.select_one( | |||
'.poll-choice-value') | |||
assert poll_choice_value is not None | |||
ratio: str = poll_choice_value.text | |||
poll_choice_option: Tag | None = poll_meter.select_one( | |||
'.poll-choice-option') | |||
assert poll_choice_option is not None | |||
if 'leader' in poll_meter['class']: | if 'leader' in poll_meter['class']: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + \ | ||
ratio + ' ' + poll_choice_option.text + '</span>' | |||
else: | else: | ||
poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + | poll_txt += f'<br>\n <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + \ | ||
ratio + ' ' + poll_choice_option.text + '</span>' | |||
poll_txt += '<br>\n <span style="font-size: small;">' + \ | poll_txt += '<br>\n <span style="font-size: small;">' + \ | ||
poll_info.text + '</span>' | |||
return poll_txt | return poll_txt | ||
1,257行目: | 1,277行目: | ||
def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn: | def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn: | ||
"""ページからツイート本文を `` | """ページからツイート本文を ``TableBuilder`` インスタンスに収めていく。 | ||
ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。 | ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。 | ||
1,269行目: | 1,289行目: | ||
soup) # 一ツイートのブロックごとにリストで取得 | soup) # 一ツイートのブロックごとにリストで取得 | ||
for tweet in tweets: # 一ツイート毎に処理 | for tweet in tweets: # 一ツイート毎に処理 | ||
tweet_a: Tag | None = tweet.a | |||
assert tweet_a is not None | |||
if tweet_a.text == self.NEWEST: | |||
# Load Newestのボタンは処理しない | # Load Newestのボタンは処理しない | ||
continue | continue | ||
1,288行目: | 1,310行目: | ||
continue | continue | ||
tweet_link: Tag | NavigableString | None = tweet.find( | |||
class_='tweet-link') | |||
assert isinstance(tweet_link, Tag) | |||
href: str | list[str] | None = tweet_link.get('href') | |||
assert isinstance(href, str) | |||
tweet_url: str = urljoin( | tweet_url: str = urljoin( | ||
self.TWITTER_URL, | self.TWITTER_URL, | ||
re.sub( | re.sub('#[^#]*$', '', href)) | ||
date: datetime = self._tweet_date(tweet) | date: datetime = self._tweet_date(tweet) | ||
self._table_builder.next_day_if_necessary(date) | self._table_builder.next_day_if_necessary(date) | ||
archived_tweet_url: str = self._callinshowlink_url( | archived_tweet_url: str = self._callinshowlink_url( | ||
tweet_url, accessor) | tweet_url, accessor) | ||
tweet_content: Tag = tweet.find( | tweet_content: Tag | NavigableString | None = tweet.find( | ||
class_='tweet-content media-body') | class_='tweet-content media-body') | ||
assert isinstance(tweet_content, Tag) | |||
self._archive_soup(tweet_content, accessor) | self._archive_soup(tweet_content, accessor) | ||
media_txt: str = self._get_tweet_media(tweet, accessor) | media_txt: str = self._get_tweet_media(tweet, accessor) | ||
1,444行目: | 1,468行目: | ||
else: | else: | ||
soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser') | ||
content: Tag = soup.find( | content: Tag | NavigableString | None = soup.find( | ||
id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得 | id='CONTENT') # archive.todayの魚拓一覧ページの中身だけ取得 | ||
if content is None or content.get_text()[:len( | if content is None or content.get_text()[:len( | ||
1,450行目: | 1,474行目: | ||
logger.warning(url + 'の魚拓がない。これはいけない。') | logger.warning(url + 'の魚拓がない。これはいけない。') | ||
else: | else: | ||
assert isinstance(content, Tag) | |||
content_a: Tag | NavigableString | None = content.find('a') | |||
assert isinstance(content_a, Tag) | |||
href: str | list[str] | None = content_a.get('href') | |||
assert isinstance(href, str) | |||
archive_url = href.replace( | |||
self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD) | ||
return archive_url | return archive_url | ||
1,468行目: | 1,497行目: | ||
for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | for show_more in show_mores: # show-moreに次ページへのリンクか前ページへのリンクがある | ||
if show_more.text != self.NEWEST: # 前ページへのリンクではないか判定 | if show_more.text != self.NEWEST: # 前ページへのリンクではないか判定 | ||
show_more_a: Tag | None = show_more.a | |||
assert show_more_a is not None | |||
href: str | list[str] | None = show_more_a.get('href') | |||
assert isinstance(href, str) | |||
new_url = urljoin( | new_url = urljoin( | ||
self.NITTER_INSTANCE, | self.NITTER_INSTANCE, | ||
1,473行目: | 1,506行目: | ||
+ '/' | + '/' | ||
+ self.TWEETS_OR_REPLIES | + self.TWEETS_OR_REPLIES | ||
+ | + href) # 直下のaタグのhrefの中身取ってURL頭部分と合体 | ||
res: Final[str | None] = accessor.request(new_url) | res: Final[str | None] = accessor.request(new_url) | ||
if res is None: | if res is None: |