「利用者:夜泣き/スクリプト」の版間の差分

→‎コード: v4.0.4 SeleniumAccessor.quit()がself._driverの未定義時にエラーを吐かないよう修正
>Fet-Fe
(→‎コード: v4.0.3 pythonのバージョン指定を3.11以上に変更テーブル組み立てを別のクラスに逃す)
>Fet-Fe
(→‎コード: v4.0.4 SeleniumAccessor.quit()がself._driverの未定義時にエラーを吐かないよう修正)
7行目: 7行目:
"""Twitter自動収集スクリプト
"""Twitter自動収集スクリプト


ver4.0.3 2023/9/28恒心
ver4.0.4 2023/9/29恒心


当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
60行目: 60行目:
from datetime import datetime
from datetime import datetime
from logging import Logger, getLogger
from logging import Logger, getLogger
from re import Match
from time import sleep
from time import sleep
from types import MappingProxyType, TracebackType
from types import MappingProxyType, TracebackType
154行目: 155行目:
     """
     """


     def __init__(self):
     def __init__(self) -> None:
         """コンストラクタ。
         """コンストラクタ。
         """
         """
316行目: 317行目:
     """
     """


     def __init__(self, enable_javascript: bool):
     def __init__(self, enable_javascript: bool) -> None:
         """コンストラクタ。
         """コンストラクタ。


332行目: 333行目:
             logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')
             logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')


         options.preferences.update({
         options.preferences.update({ # type: ignore
             'javascript.enabled': enable_javascript,
             'javascript.enabled': enable_javascript,
             'intl.accept_languages': 'en-US, en',
             'intl.accept_languages': 'en-US, en',
341行目: 342行目:


         try:
         try:
             self.driver: webdriver.Firefox = webdriver.Firefox(options=options)
             self._driver: webdriver.Firefox = webdriver.Firefox(
                options=options)
             sleep(1)
             sleep(1)
             wait_init: WebDriverWait = WebDriverWait(self.driver,
             wait_init: WebDriverWait = WebDriverWait(self._driver,
                                                     self.WAIT_TIME_FOR_INIT)
                                                     self.WAIT_TIME_FOR_INIT)
             wait_init.until(
             wait_init.until( # type: ignore
                 ec.element_to_be_clickable((By.ID, 'connectButton'))
                 ec.element_to_be_clickable((By.ID, 'connectButton'))
             )
             )
             self.driver.find_element(By.ID, 'connectButton').click()
             self._driver.find_element(By.ID, 'connectButton').click()
             wait_init.until(ec.url_contains('about:blank'))  # Torの接続が完了するまで待つ
            # Torの接続が完了するまで待つ
             wait_init.until(ec.url_contains('about:blank'))  # type: ignore


             self.wait: WebDriverWait = WebDriverWait(self.driver,
             self.wait: WebDriverWait = WebDriverWait(self._driver,
                                                     self.REQUEST_TIMEOUT)
                                                     self.REQUEST_TIMEOUT)
         except BaseException:
         except BaseException:
360行目: 363行目:
         """Seleniumドライバを終了する。
         """Seleniumドライバを終了する。
         """
         """
         if self.driver:
         if hasattr(self, 'driver'):
             self.driver.quit()
             self._driver.quit()


     def _check_recaptcha(self) -> None:
     def _check_recaptcha(self) -> None:
373行目: 376行目:
         """
         """
         try:
         try:
             self.driver.find_element(  # 要素がない時に例外を吐く
             self._driver.find_element(  # 要素がない時に例外を吐く
                 By.CSS_SELECTOR,
                 By.CSS_SELECTOR,
                 'script[src^="https://www.google.com/recaptcha/api.js"]')
                 'script[src^="https://www.google.com/recaptcha/api.js"]')
380行目: 383行目:
                 logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ')
                 logger.warning('botバレしたらNew Tor circuit for this siteを選択するナリよ')
                 WebDriverWait(
                 WebDriverWait(
                     self.driver,
                     self._driver,
                     10000).until(
                     10000).until( # type: ignore
                     ec.staleness_of(
                     ec.staleness_of(
                         self.driver.find_element(
                         self._driver.find_element(
                             By.CSS_SELECTOR,
                             By.CSS_SELECTOR,
                             'script[src^="https://www.google.com/recaptcha/api.js"]')))
                             'script[src^="https://www.google.com/recaptcha/api.js"]')))
390行目: 393行目:
                 raise ReCaptchaRequiredError(
                 raise ReCaptchaRequiredError(
                     'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: '
                     'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: '
                     + self.driver.current_url)
                     + self._driver.current_url)
         except NoSuchElementException:
         except NoSuchElementException:
             # reCAPTCHAの要素がなければそのまま
             # reCAPTCHAの要素がなければそのまま
406行目: 409行目:
         self._random_sleep()  # DoS対策で待つ
         self._random_sleep()  # DoS対策で待つ
         try:
         try:
             self.driver.get(url)
             self._driver.get(url)
             self._check_recaptcha()
             self._check_recaptcha()
         except WebDriverException as e:
         except WebDriverException as e:
             # Selenium固有の例外を共通の例外に変換
             # Selenium固有の例外を共通の例外に変換
             raise AccessError(str(e)) from e
             raise AccessError(str(e)) from e
         return self.driver.page_source
         return self._driver.page_source




428行目: 431行目:
     """
     """


     def __init__(self, use_browser: bool, enable_javascript: bool):
     def __init__(self, use_browser: bool, enable_javascript: bool) -> None:
         """コンストラクタ。
         """コンストラクタ。


576行目: 579行目:


class TableBuilder:
class TableBuilder:
     def __init__(self, date: datetime):
     def __init__(self, date: datetime) -> None:
         self._tables: list[str] = ['']
         self._tables: list[str] = ['']
         self._count: int = 0  # 記録数
         self._count: int = 0  # 記録数
613行目: 616行目:
             self._tables[0] = self._convert_to_text_table(self._tables[0])
             self._tables[0] = self._convert_to_text_table(self._tables[0])
             if os.name == 'nt':  # Windows
             if os.name == 'nt':  # Windows
                 self._txt_data[0] = self._date.strftime(
                 self._tables[0] = self._date.strftime(
                     '\n=== %#m月%#d日 ===\n') + self._txt_data[0]
                     '\n=== %#m月%#d日 ===\n') + self._tables[0]
                 logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
                 logger.info(self._date.strftime('%#m月%#d日のツイートを取得完了ですを'))
             else:  # Mac or Linux
             else:  # Mac or Linux
625行目: 628行目:


     def _convert_to_text_table(self, text: str) -> str:
     def _convert_to_text_table(self, text: str) -> str:
         """``self._txt_data[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。
         """``self._tables[0]`` にwikiでテーブル表示にするためのヘッダとフッタをつける。


         Args:
         Args:
662行目: 665行目:
                         i += 2
                         i += 2
                 else:
                 else:
                     if text[i:i + 10] == '{{Archive|' or text[i:i + 10] == '{{archive|':
                     if (text[i:i + 10] == '{{Archive|'
                            or text[i:i + 10] == '{{archive|'):
                         is_in_archive_template = True
                         is_in_archive_template = True
                         i += 10
                         i += 10
807行目: 811行目:
     """
     """


     def __init__(self):
     def __init__(self) -> None:
         """コンストラクタ。
         """コンストラクタ。
         """
         """
1,050行目: 1,054行目:
         date_str: str | list[str] | None = tweet_date_a.get('title')
         date_str: str | list[str] | None = tweet_date_a.get('title')
         assert isinstance(date_str, str)
         assert isinstance(date_str, str)
         date: datetime = datetime.strptime(
         return datetime.strptime(
             date_str,
             date_str,
             '%b %d, %Y · %I:%M %p %Z').replace(
             '%b %d, %Y · %I:%M %p %Z').replace(
             tzinfo=ZoneInfo('UTC')).astimezone(
             tzinfo=ZoneInfo('UTC')).astimezone(
             ZoneInfo('Asia/Tokyo'))
             ZoneInfo('Asia/Tokyo'))
        return date


     def _get_tweet_media(
     def _get_tweet_media(
1,078行目: 1,081行目:
             for image_a in tweet_media.select('.attachment.image a'):
             for image_a in tweet_media.select('.attachment.image a'):
                 try:
                 try:
                    href: str | list[str] | None = image_a.get('href')
                    assert isinstance(href, str)
                     media_name: str = [
                     media_name: str = [
                         group for group in re.search(
                         group for group in re.search(
                             r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)',
                             r'%2F([^%]*\.jpg)|%2F([^%]*\.jpeg)|%2F([^%]*\.png)|%2F([^%]*\.gif)',
                             image_a.get('href')).groups() if group][0]
                             href).groups() if group][0]
                     media_list.append(f'[[ファイル:{media_name}|240px]]')
                     media_list.append(f'[[ファイル:{media_name}|240px]]')
                     if self._download_media(media_name, accessor):
                     if self._download_media(media_name, accessor):
1,096行目: 1,101行目:
                             + ' をアップロードしなければない。')
                             + ' をアップロードしなければない。')
                 except AttributeError:
                 except AttributeError:
                    tweet_link: Tag | NavigableString | None = tweet.find(
                        class_='tweet-link')
                    assert isinstance(tweet_link, Tag)
                    href: str | list[str] | None = tweet_link.get('href')
                    assert isinstance(href, str)
                     tweet_url: str = urljoin(
                     tweet_url: str = urljoin(
                         self.TWITTER_URL,
                         self.TWITTER_URL,
                         re.sub(
                         re.sub('#[^#]*$', '', href))
                            '#[^#]*$',
                            '',
                            tweet.find(
                                class_='tweet-link').get('href')))
                     logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
                     logger.error(f'{tweet_url}の画像が取得できませんでしたを 当職無能')
                     media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
                     media_list.append('[[ファイル:(画像の取得ができませんでした)|240px]]')
1,108行目: 1,114行目:
             for i, video_container in enumerate(
             for i, video_container in enumerate(
                     tweet_media.select('.attachment.video-container')):
                     tweet_media.select('.attachment.video-container')):
                tweet_link: Tag | NavigableString | None = tweet.find(
                    class_='tweet-link')
                assert isinstance(tweet_link, Tag)
                href: str | list[str] | None = tweet_link.get('href')
                assert isinstance(href, str)
                 tweet_url: str = urljoin(
                 tweet_url: str = urljoin(
                     self.TWITTER_URL,
                     self.TWITTER_URL,
                     re.sub(
                     re.sub('#[^#]*$', '', href))  # ツイートのURL作成
                        '#[^#]*$',
                        '',
                        tweet.find(
                            class_='tweet-link').get('href')))  # ツイートのURL作成
                 video = video_container.select_one('video')
                 video = video_container.select_one('video')
                 if video is None:
                 if video is None:
1,129行目: 1,136行目:
                 else:  # ffmpegがある場合
                 else:  # ffmpegがある場合
                     # TODO: ブロックが大きすぎるので別メソッドに切り出す
                     # TODO: ブロックが大きすぎるので別メソッドに切り出す
                     media_url: str = unquote(
                     data_url: str | list[str] | None = video.get('data-url')
                        re.search(
                    assert isinstance(data_url, str)
                            r'[^\/]+$',
                    matched: Match[str] | None = re.search(
                            video.get('data-url')).group(0))
                        r'[^\/]+$', data_url)
                    assert matched is not None
                    media_url: str = unquote(matched.group(0))
                     tweet_id: str = tweet_url.split('/')[-1]
                     tweet_id: str = tweet_url.split('/')[-1]
                     # 動画のダウンロード
                     # 動画のダウンロード
1,194行目: 1,203行目:
         quote_txt: str = ''
         quote_txt: str = ''
         if tweet_quote is not None:
         if tweet_quote is not None:
             link: str = tweet_quote.select_one('.quote-link').get('href')
             quote_link: Tag | None = tweet_quote.select_one('.quote-link')
            assert quote_link is not None
            link: str | list[str] | None = quote_link.get('href')
            assert isinstance(link, str)
             link = re.sub('#.*$', '', link)
             link = re.sub('#.*$', '', link)
             link = urljoin(self.TWITTER_URL, link)
             link = urljoin(self.TWITTER_URL, link)
1,219行目: 1,231行目:
             poll_meters: Final[ResultSet[Tag]] = tweet_poll.select(
             poll_meters: Final[ResultSet[Tag]] = tweet_poll.select(
                 '.poll-meter')
                 '.poll-meter')
            poll_info: Tag | None = tweet_poll.select_one('.poll-info')
            assert poll_info is not None
             for poll_meter in poll_meters:
             for poll_meter in poll_meters:
                 ratio: str = poll_meter.select_one('.poll-choice-value').text
                 poll_choice_value: Tag | None = poll_meter.select_one(
                    '.poll-choice-value')
                assert poll_choice_value is not None
                ratio: str = poll_choice_value.text
                poll_choice_option: Tag | None = poll_meter.select_one(
                    '.poll-choice-option')
                assert poll_choice_option is not None
                 if 'leader' in poll_meter['class']:
                 if 'leader' in poll_meter['class']:
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + ratio + ' ' + poll_meter.select_one(
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgba(29, 155, 240, 0.58) 0 {ratio}, transparent {ratio} 100%); font-weight: bold;">' + \
                        '.poll-choice-option').text + '</span>'
                        ratio + ' ' + poll_choice_option.text + '</span>'
                 else:
                 else:
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + ratio + ' ' + poll_meter.select_one(
                     poll_txt += f'<br>\n&nbsp; <span style="display: inline-block; width: 30em; background: linear-gradient(to right, rgb(207, 217, 222) 0 {ratio}, transparent {ratio} 100%);">' + \
                        '.poll-choice-option').text + '</span>'
                        ratio + ' ' + poll_choice_option.text + '</span>'
             poll_txt += '<br>\n&nbsp; <span style="font-size: small;">' + \
             poll_txt += '<br>\n&nbsp; <span style="font-size: small;">' + \
                 tweet_poll.select_one('.poll-info').text + '</span>'
                 poll_info.text + '</span>'
         return poll_txt
         return poll_txt


1,257行目: 1,277行目:


     def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn:
     def _get_tweet(self, accessor: AccessorHandler) -> None | NoReturn:
         """ページからツイート本文を ``self._txt_data`` に収めていく。
         """ページからツイート本文を ``TableBuilder`` インスタンスに収めていく。


         ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。
         ツイートの記録を中断するための文を発見するか、記録したツイート数が :const:`~LIMIT_N_TWEETS` 件に達したら終了する。
1,269行目: 1,289行目:
             soup)  # 一ツイートのブロックごとにリストで取得
             soup)  # 一ツイートのブロックごとにリストで取得
         for tweet in tweets:  # 一ツイート毎に処理
         for tweet in tweets:  # 一ツイート毎に処理
             if tweet.a.text == self.NEWEST:
             tweet_a: Tag | None = tweet.a
            assert tweet_a is not None
            if tweet_a.text == self.NEWEST:
                 # Load Newestのボタンは処理しない
                 # Load Newestのボタンは処理しない
                 continue
                 continue
1,288行目: 1,310行目:
                     continue
                     continue


            tweet_link: Tag | NavigableString | None = tweet.find(
                class_='tweet-link')
            assert isinstance(tweet_link, Tag)
            href: str | list[str] | None = tweet_link.get('href')
            assert isinstance(href, str)
             tweet_url: str = urljoin(
             tweet_url: str = urljoin(
                 self.TWITTER_URL,
                 self.TWITTER_URL,
                 re.sub(
                 re.sub('#[^#]*$', '', href))
                    '#[^#]*$',
                    '',
                    tweet.find(
                        class_='tweet-link').get('href')))
             date: datetime = self._tweet_date(tweet)
             date: datetime = self._tweet_date(tweet)
             self._table_builder.next_day_if_necessary(date)
             self._table_builder.next_day_if_necessary(date)
             archived_tweet_url: str = self._callinshowlink_url(
             archived_tweet_url: str = self._callinshowlink_url(
                 tweet_url, accessor)
                 tweet_url, accessor)
             tweet_content: Tag = tweet.find(
             tweet_content: Tag | NavigableString | None = tweet.find(
                 class_='tweet-content media-body')
                 class_='tweet-content media-body')
            assert isinstance(tweet_content, Tag)
             self._archive_soup(tweet_content, accessor)
             self._archive_soup(tweet_content, accessor)
             media_txt: str = self._get_tweet_media(tweet, accessor)
             media_txt: str = self._get_tweet_media(tweet, accessor)
1,444行目: 1,468行目:
         else:
         else:
             soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
             soup: Final[BeautifulSoup] = BeautifulSoup(res, 'html.parser')
             content: Tag = soup.find(
             content: Tag | NavigableString | None = soup.find(
                 id='CONTENT')  # archive.todayの魚拓一覧ページの中身だけ取得
                 id='CONTENT')  # archive.todayの魚拓一覧ページの中身だけ取得
             if content is None or content.get_text()[:len(
             if content is None or content.get_text()[:len(
1,450行目: 1,474行目:
                 logger.warning(url + 'の魚拓がない。これはいけない。')
                 logger.warning(url + 'の魚拓がない。これはいけない。')
             else:
             else:
                 archive_url = content.find('a').get('href').replace(
                 assert isinstance(content, Tag)
                content_a: Tag | NavigableString | None = content.find('a')
                assert isinstance(content_a, Tag)
                href: str | list[str] | None = content_a.get('href')
                assert isinstance(href, str)
                archive_url = href.replace(
                     self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
                     self.ARCHIVE_TODAY, self.ARCHIVE_TODAY_STANDARD)
         return archive_url
         return archive_url
1,468行目: 1,497行目:
         for show_more in show_mores:  # show-moreに次ページへのリンクか前ページへのリンクがある
         for show_more in show_mores:  # show-moreに次ページへのリンクか前ページへのリンクがある
             if show_more.text != self.NEWEST:  # 前ページへのリンクではないか判定
             if show_more.text != self.NEWEST:  # 前ページへのリンクではないか判定
                show_more_a: Tag | None = show_more.a
                assert show_more_a is not None
                href: str | list[str] | None = show_more_a.get('href')
                assert isinstance(href, str)
                 new_url = urljoin(
                 new_url = urljoin(
                     self.NITTER_INSTANCE,
                     self.NITTER_INSTANCE,
1,473行目: 1,506行目:
                     + '/'
                     + '/'
                     + self.TWEETS_OR_REPLIES
                     + self.TWEETS_OR_REPLIES
                     + show_more.a.get('href'))  # 直下のaタグのhrefの中身取ってURL頭部分と合体
                     + href)  # 直下のaタグのhrefの中身取ってURL頭部分と合体
         res: Final[str | None] = accessor.request(new_url)
         res: Final[str | None] = accessor.request(new_url)
         if res is None:
         if res is None:
匿名利用者