「利用者:夜泣き/スクリプト」の版間の差分

→‎コード: v4.1.1 --search-unarchivedでリツイートを除く処理を追加
>Fet-Fe
(→‎コード: v4.1.0 wikiに掲載されていないツイートのURLをarchive.isから取得する機能を追加)
>Fet-Fe
(→‎コード: v4.1.1 --search-unarchivedでリツイートを除く処理を追加)
7行目: 7行目:
"""Twitter自動収集スクリプト
"""Twitter自動収集スクリプト


ver4.1.0 2023/10/9恒心
ver4.1.1 2023/10/22恒心


当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
当コードは恒心停止してしまった https://rentry.co/7298g の降臨ショーツイート自動収集スクリプトの復刻改善版です
22行目: 22行目:


         $ python3 (ファイル名) --krsw
         $ python3 (ファイル名) --krsw
    ``--no_browser`` オプションでTor Browserを使用しないモードに、
    ``--disable_script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
    ``--search_unarchived`` オプションでは、`archive.today <https://archive.today>`_ から
    Wikiに未掲載のツイートのURLを収集するモードになります。


     自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
     自動モードではユーザーは降臨ショー、クエリはなし、取得上限まで自動です。
     つまりユーザー入力が要りません。
     つまりユーザー入力が要りません。
    ``--no-browser`` オプションでTor Browserを使用しないモードに、
    ``--disable-script`` オプションでTor BrowserのJavaScriptを使用しないモードになります。
    ``--search-unarchived`` オプションでは、`archive.today <https://archive.today>`_ から
    Wikiに未掲載のツイートのURLを収集するモードになります。


Note:
Note:
36行目: 35行目:
     * 環境は玉葱前提です。
     * 環境は玉葱前提です。


         * Tor Browserを入れておくか、torコマンドでプロキシを立てておくことが必要です。
         * TailsやWhonixでない場合、Tor Browserを入れておくか、torコマンドでプロキシを立てておくことが必要です。


     * Whonix-Workstation, MacOSで動作確認済
     * Whonix-Workstation, MacOSで動作確認済
68行目: 67行目:
from enum import Enum
from enum import Enum
from logging import Logger, getLogger
from logging import Logger, getLogger
from re import Match
from re import Match, Pattern
from time import sleep
from time import sleep
from types import MappingProxyType, TracebackType
from types import MappingProxyType, TracebackType
79行目: 78行目:
from bs4.element import NavigableString, ResultSet, Tag
from bs4.element import NavigableString, ResultSet, Tag
from selenium import webdriver
from selenium import webdriver
from selenium.common.exceptions import (NoSuchElementException,
from selenium.common.exceptions import WebDriverException
                                        WebDriverException)
from selenium.webdriver.common.by import By
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.options import Options as FirefoxOptions
336行目: 334行目:
             enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
             enable_javascript (bool): JavaScriptを有効にするかどうか。reCAPTCHA対策に必要。
         """
         """
         self._javascript_enabled: Final[bool] = enable_javascript
         self._options: Final[FirefoxOptions] = FirefoxOptions()
 
         self._options.binary_location = (
        options: Final[FirefoxOptions] = FirefoxOptions()
            self.TOR_BROWSER_PATHS[platform.system()]
         options.binary_location = self.TOR_BROWSER_PATHS[platform.system()]
        )


         if enable_javascript:
         if enable_javascript:
             logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')
             logger.warning('reCAPTCHA対策のためJavaScriptをonにしますを')


         options.preferences.update({  # type: ignore
         self._options.preferences.update({  # type: ignore
             'javascript.enabled': enable_javascript,
             'javascript.enabled': enable_javascript,
             'intl.accept_languages': 'en-US, en',
             'intl.accept_languages': 'en-US, en',
351行目: 349行目:
             'dom.webdriver.enabled': False  # 自動操縦と見破られないための設定
             'dom.webdriver.enabled': False  # 自動操縦と見破られないための設定
         })
         })
        self._refresh_browser()


    def quit(self) -> None:
        """Seleniumドライバを終了する。
        """
        if hasattr(self, '_driver'):
            logger.debug('ブラウザ終了')
            self._driver.quit()
    def _refresh_browser(self) -> None:
        """ブラウザを起動する。
        """
         try:
         try:
             self._driver: Final[webdriver.Firefox] = webdriver.Firefox(
            logger.debug('ブラウザ起動')
                 options=options)
             self._driver: webdriver.Firefox = webdriver.Firefox(
                 options=self._options)
             sleep(1)
             sleep(1)
             wait_init: Final[WebDriverWait] = WebDriverWait(
             wait_init: Final[WebDriverWait] = WebDriverWait(
368行目: 378行目:
             self.quit()
             self.quit()
             raise
             raise
    def quit(self) -> None:
        """Seleniumドライバを終了する。
        """
        if hasattr(self, '_driver'):
            self._driver.quit()


     def _check_recaptcha(self, url: str) -> None:
     def _check_recaptcha(self, url: str) -> None:
388行目: 392行目:
             botバレしたときに自動で他のTorサーキットに接続し直す。
             botバレしたときに自動で他のTorサーキットに接続し直す。
         """
         """
         try:
         if len(self._driver.find_elements(By.ID, 'g-recaptcha')) > 0:
            self._driver.find_element(By.ID, 'g-recaptcha') # 要素がない時に例外を吐く
             if self._options.preferences.get('javascript.enabled'):  # type: ignore
             if self._javascript_enabled:
                 logger.warning(f'{url} でreCAPTCHAが要求されたナリ')
                 logger.warning(f'{url} でreCAPTCHAが要求されたナリ')
                 print('reCAPTCHAを解いてね(笑)、それはできるよね。')
                 print('reCAPTCHAを解いてね(笑)、それはできるよね。')
399行目: 402行目:
                     ec.staleness_of(
                     ec.staleness_of(
                         self._driver.find_element(By.ID, 'g-recaptcha')))
                         self._driver.find_element(By.ID, 'g-recaptcha')))
                 sleep(self.WAIT_TIME)  # DoS対策で待つ
                 self._random_sleep()  # DoS対策で待つ
                self._driver.get(url)
             else:
             else:
                 raise ReCaptchaRequiredError(
                 raise ReCaptchaRequiredError(
                     f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}')
                     f'JavaScriptがオフの状態でreCAPTCHAが要求されました。 URL: {url}')
        except NoSuchElementException:
            # reCAPTCHAの要素がなければそのまま
            pass
        self._random_sleep()
        self._driver.get(url)


     def get(self, url: str) -> str:
     def get(self, url: str) -> str:
691行目: 690行目:
         return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'
         return '{|class="wikitable" style="text-align: left;"\n' + text + '|}'


     @staticmethod
     @classmethod
     def escape_wiki_reserved_words(text: str) -> str:
     def escape_wiki_reserved_words(cls, text: str) -> str:
         """MediaWikiの文法と衝突する文字を無効化する。
         """MediaWikiの文法と衝突する文字を無効化する。


735行目: 734行目:
             return text
             return text


         text = text.replace('\n', '<br>\n')
         if not hasattr(cls, '_escape_callables'):
        text = re.sub(r'^ ', '&nbsp;', text, flags=re.MULTILINE)
            # 初回呼び出しの時だけ正規表現をコンパイルする
        text = re.sub(
            patterns: tuple[Pattern[str], ...] = (
            r'^([\*#:;])',
                re.compile(r'^ ', re.MULTILINE),
            r'<nowiki>\1</nowiki>',
                re.compile(r'^([\*#:;])', re.MULTILINE),
             text,
                re.compile(r'^----', re.MULTILINE)
             flags=re.MULTILINE)
             )
        text = re.sub(
 
            r'^----',
             cls._escape_callables: tuple[Callable[[str], str], ...] = (
            '<nowiki>----</nowiki>',
                lambda t: t.replace('\n', '<br>\n'),
             text,
                lambda t: patterns[0].sub('&nbsp;', t),
             flags=re.MULTILINE)
                lambda t: patterns[1].sub(r'<nowiki>\1</nowiki>', t),
                lambda t: patterns[2].sub('<nowiki>----</nowiki>', t)
             )
 
        for escape_callable in cls._escape_callables:
             text = escape_callable(text)
         text = escape_nolink_urls(text)
         text = escape_nolink_urls(text)
         return text
         return text
904行目: 908行目:
         self._check_slash()  # スラッシュが抜けてないかチェック
         self._check_slash()  # スラッシュが抜けてないかチェック
         self._has_ffmpeg: Final[bool] = self._check_ffmpeg()  # ffmpegがあるかチェック
         self._has_ffmpeg: Final[bool] = self._check_ffmpeg()  # ffmpegがあるかチェック
        self._img_ext_pattern: Final[Pattern[str]] = re.compile(
            r'%2F([^%]*\.(?:jpg|jpeg|png|gif))')
        self._url_fragment_pattern: Final[Pattern[str]] = re.compile('#[^#]*$')
        self._url_query_pattern: Final[Pattern[str]] = re.compile(r'\?.*$')


     def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool:
     def _set_queries(self, accessor: AccessorHandler, krsw: bool) -> bool:
1,253行目: 1,262行目:
                     href: str | list[str] | None = image_a.get('href')
                     href: str | list[str] | None = image_a.get('href')
                     assert isinstance(href, str)
                     assert isinstance(href, str)
                     matched: Match[str] | None = re.search(
                     matched: Match[str] | None = self._img_ext_pattern.search(
                        r'%2F([^%]*\.(?:jpg|jpeg|png|gif))',
                         href)
                         href)
                     assert matched is not None
                     assert matched is not None
1,341行目: 1,349行目:
             link: str | list[str] | None = quote_link.get('href')
             link: str | list[str] | None = quote_link.get('href')
             assert isinstance(link, str)
             assert isinstance(link, str)
             link = re.sub('#.*$', '', link)
             link = self._url_fragment_pattern.sub('', link)
             link = urljoin(self.TWITTER_URL, link)
             link = urljoin(self.TWITTER_URL, link)
             quote_txt = self._archive_url(link, accessor)
             quote_txt = self._archive_url(link, accessor)
1,444行目: 1,452行目:
                     url_link: str = href.replace(
                     url_link: str = href.replace(
                         'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
                         'https' + self.NITTER_INSTANCE[4:], self.TWITTER_URL)
                     url_link = re.sub(r'\?.*$', '', url_link)
                     url_link = self._url_query_pattern.sub('', url_link)
                     url.replace_with(self._archive_url(url_link, accessor))
                     url.replace_with(self._archive_url(url_link, accessor))
                 elif href.startswith('https://nitter.kavin.rocks/'):
                 elif href.startswith('https://nitter.kavin.rocks/'):
1,450行目: 1,458行目:
                     url_link: str = href.replace(
                     url_link: str = href.replace(
                         'https://nitter.kavin.rocks/', self.TWITTER_URL)
                         'https://nitter.kavin.rocks/', self.TWITTER_URL)
                     url_link = re.sub(r'\?.*$', '', url_link)
                     url_link = self._url_query_pattern.sub('', url_link)
                     url.replace_with(self._archive_url(url_link, accessor))
                     url.replace_with(self._archive_url(url_link, accessor))
                 elif (hasattr(self, '_invidious_pattern')
                 elif (hasattr(self, '_invidious_pattern')
1,607行目: 1,615行目:
             tweet_url: Final[str] = urljoin(
             tweet_url: Final[str] = urljoin(
                 self.TWITTER_URL,
                 self.TWITTER_URL,
                 re.sub('#[^#]*$', '', href))
                 self._url_fragment_pattern.sub('', href))


             tweet_callinshow_template: Final[str] = self._callinshowlink_url(
             tweet_callinshow_template: Final[str] = self._callinshowlink_url(
1,783行目: 1,791行目:
             'ユーザー名: @' + self._name + 'で検索しまふ'
             'ユーザー名: @' + self._name + 'で検索しまふ'
         )
         )
        self._twitter_url_pattern: Pattern[str] = re.compile(
            self.TWITTER_URL + self._name + r'/status/\d+')


         self._url_list_on_wiki: list[str] = []
         self._url_list_on_wiki: list[str] = []
1,842行目: 1,853行目:
             a_last_child: Tag | None = tweet.select_one('a:last-child')
             a_last_child: Tag | None = tweet.select_one('a:last-child')
             assert a_last_child is not None
             assert a_last_child is not None
             url_matched: Final[Match[str]] | None = re.match(
             url_matched: Final[Match[str]] | None = (
                 self.TWITTER_URL + self._name + r'/status/\d+',
                 self._twitter_url_pattern.match(a_last_child.text)
                a_last_child.text)
            )
             if url_matched is not None:
             if url_matched is not None:
                 a_first_child: Tag | None = tweet.select_one('a:first-child')
                 a_first_child: Tag | None = tweet.select_one('a:first-child')
1,913行目: 1,924行目:
         Examples:
         Examples:
             `https://twitter.com/CallinShow/status/1707` で始まるURLをすべて探索する場合
             `https://twitter.com/CallinShow/status/1707` で始まるURLをすべて探索する場合
             ::
             ::


1,920行目: 1,930行目:
             `https://twitter.com/CallinShow/status/165` で始まるURLから
             `https://twitter.com/CallinShow/status/165` で始まるURLから
             `https://twitter.com/CallinShow/status/169` で始まるURLまでをすべて探索する場合
             `https://twitter.com/CallinShow/status/169` で始まるURLまでをすべて探索する場合
             ::
             ::


1,974行目: 1,983行目:
             '%Y-%m-%dT%H:%M:%SZ')
             '%Y-%m-%dT%H:%M:%SZ')
         return raw_time.replace(tzinfo=ZoneInfo('Asia/Tokyo'))
         return raw_time.replace(tzinfo=ZoneInfo('Asia/Tokyo'))
    def _filter_out_retweets(
            self,
            url_pairs: list[UrlTuple],
            accessor: AccessorHandler) -> list[UrlTuple]:
        """リツイートを除く。
        Args:
            url_pairs (list[UrlTuple]): リツイートを除く前のURLのリスト。
            accessor (AccessorHandler): アクセスハンドラ。
        Returns:
            list[UrlTuple]: リツイートを除いたURLのリスト。
        """
        filtered_urls: list[UrlTuple] = []
        for url_pair in url_pairs:
            page: str | None = accessor.request(url_pair.archive_url)
            assert page is not None
            soup: BeautifulSoup = BeautifulSoup(page)
            if soup.select_one('span[data-testid="socialContext"]') is None:
                filtered_urls.append(url_pair)
        return filtered_urls


     @override
     @override
1,993行目: 2,026行目:
                           self.TWEET_URL_PREFIX_DEFAULT,
                           self.TWEET_URL_PREFIX_DEFAULT,
                           self.INCREMENTED_NUM_DEFAULT)
                           self.INCREMENTED_NUM_DEFAULT)
            # リツイートを除く
            filtered_url_list: list[UrlTuple] = self._filter_out_retweets(
                self._url_list, accessor)


         with codecs.open(self.FILENAME, 'w', 'utf-8') as f:
         with codecs.open(self.FILENAME, 'w', 'utf-8') as f:
             for url_pair in self._url_list:
             for url_pair in filtered_url_list:
                 f.write(url_pair.url + '\n')
                 f.write(url_pair.url + '\n')
         logger.info('テキストファイル手に入ったやで〜')
         logger.info('テキストファイル手に入ったやで〜')
2,012行目: 2,049行目:
     parser.add_argument(
     parser.add_argument(
         '-n',
         '-n',
         '--no_browser',
         '--no-browser',
         action='store_true',
         action='store_true',
         help='指定すると、Tor Browserを利用しない。')
         help='指定すると、Tor Browserを利用しない。')
     parser.add_argument(
     parser.add_argument(
         '-d',
         '-d',
         '--disable_script',
         '--disable-script',
         action='store_true',
         action='store_true',
         help='指定すると、Tor BrowserでJavaScriptを利用しない。')
         help='指定すると、Tor BrowserでJavaScriptを利用しない。')
     parser.add_argument(
     parser.add_argument(
         '-u',
         '-u',
         '--search_unarchived',
         '--search-unarchived',
         action='store_true',
         action='store_true',
         help=('指定すると、Wikiに未掲載のツイートのURLをarchive.todayから収集する。'
         help=('指定すると、Wikiに未掲載のツイートのURLをarchive.todayから収集する。'
2,029行目: 2,066行目:
     logger.debug('args: ' + str(args))
     logger.debug('args: ' + str(args))


     twitter_archiver: Final[TwitterArchiver] = ArchiveCrawler() if (
     twitter_archiver: Final[TwitterArchiver] = (
        args.search_unarchived) else TwitterArchiver()
        ArchiveCrawler() if args.search_unarchived else TwitterArchiver()
    )
     twitter_archiver.execute(
     twitter_archiver.execute(
         args.krsw,
         args.krsw,
匿名利用者