|
| 1 | +import logging |
| 2 | +from datetime import datetime |
| 3 | +from typing import List, Dict |
| 4 | + |
| 5 | +from data_source_api.basic_document import BasicDocument, DocumentType |
| 6 | +from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType |
| 7 | +from data_source_api.exception import InvalidDataSourceConfig |
| 8 | +from data_source_api.utils import parse_with_workers |
| 9 | +from index_queue import IndexQueue |
| 10 | +from parsers.html import html_to_text |
| 11 | +from pydantic import BaseModel |
| 12 | +from requests import Session, HTTPError |
| 13 | +from requests.auth import AuthBase |
| 14 | +from urllib.parse import urljoin |
| 15 | +from time import sleep |
| 16 | + |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | + |
| 21 | +class BookStackAuth(AuthBase): |
| 22 | + def __init__(self, token_id, token_secret, header_key="Authorization"): |
| 23 | + self.header_key = header_key |
| 24 | + self.token_id = token_id |
| 25 | + self.token_secret = token_secret |
| 26 | + |
| 27 | + def __call__(self, r): |
| 28 | + r.headers[self.header_key] = f"Token {self.token_id}:{self.token_secret}" |
| 29 | + return r |
| 30 | + |
| 31 | + |
| 32 | +class BookStack(Session): |
| 33 | + def __init__(self, url: str, token_id: str, token_secret: str, *args, **kwargs): |
| 34 | + super().__init__(*args, **kwargs) |
| 35 | + self.base_url = url |
| 36 | + self.auth = BookStackAuth(token_id, token_secret) |
| 37 | + self.rate_limit_reach = False |
| 38 | + |
| 39 | + def request(self, method, url_path, *args, **kwargs): |
| 40 | + while self.rate_limit_reach: |
| 41 | + sleep(1) |
| 42 | + |
| 43 | + url = urljoin(self.base_url, url_path) |
| 44 | + r = super().request(method, url, verify=False, *args, **kwargs) |
| 45 | + |
| 46 | + if r.status_code != 200: |
| 47 | + if r.status_code == 429: |
| 48 | + if not self.rate_limit_reach: |
| 49 | + logger.info("API rate limit reach, waiting...") |
| 50 | + self.rate_limit_reach = True |
| 51 | + sleep(60) |
| 52 | + self.rate_limit_reach = False |
| 53 | + logger.info("Done waiting for the API rate limit") |
| 54 | + return self.request(method, url, verify=False, *args, **kwargs) |
| 55 | + r.raise_for_status() |
| 56 | + return r |
| 57 | + |
| 58 | + def get_list(self, url: str, count: int = 500, sort: str = None, filters: Dict = None): |
| 59 | + # Add filter[...] to keys, avoiding the insertion of unwanted parameters |
| 60 | + if filters is not None: |
| 61 | + filters = {f"filter[{k}]": v for k, v in filters.items()} |
| 62 | + else: |
| 63 | + filters = {} |
| 64 | + |
| 65 | + data = [] |
| 66 | + records = 0 |
| 67 | + total = 1 # Set 1 to enter the loop |
| 68 | + while records < total: |
| 69 | + r = self.get(url, params={"count": count, "offset": records, "sort": sort, **filters}, |
| 70 | + headers={"Content-Type": "application/json"}) |
| 71 | + json = r.json() |
| 72 | + data += json.get("data") |
| 73 | + records = len(data) |
| 74 | + total = json.get("total") |
| 75 | + return data |
| 76 | + |
| 77 | + def get_all_books(self) -> List[Dict]: |
| 78 | + return self.get_list("/api/books", sort="+updated_at") |
| 79 | + |
| 80 | + def get_all_pages_from_book(self, book) -> List[Dict]: |
| 81 | + pages = self.get_list("/api/pages", sort="+updated_at", filters={"book_id": book["id"]}) |
| 82 | + |
| 83 | + # Add parent book object to each page |
| 84 | + for page in pages: |
| 85 | + page.update({"book": book}) |
| 86 | + |
| 87 | + return pages |
| 88 | + |
| 89 | + def get_page(self, page_id: int): |
| 90 | + r = self.get(f"/api/pages/{page_id}", headers={"Content-Type": "application/json"}) |
| 91 | + return r.json() |
| 92 | + |
| 93 | + def get_user(self, user_id: int): |
| 94 | + try: |
| 95 | + return self.get(f"/api/users/{user_id}", headers={"Content-Type": "application/json"}).json() |
| 96 | + # If the user lack the privileges to make this call, return None |
| 97 | + except HTTPError: |
| 98 | + return None |
| 99 | + |
| 100 | + |
| 101 | +class BookStackConfig(BaseModel): |
| 102 | + url: str |
| 103 | + token_id: str |
| 104 | + token_secret: str |
| 105 | + |
| 106 | + |
| 107 | +class BookstackDataSource(BaseDataSource): |
| 108 | + @staticmethod |
| 109 | + def get_config_fields() -> List[ConfigField]: |
| 110 | + return [ |
| 111 | + ConfigField(label="BookStack instance URL", name="url"), |
| 112 | + ConfigField(label="Token ID", name="token_id", input_type=HTMLInputType.PASSWORD), |
| 113 | + ConfigField(label="Token Secret", name="token_secret", input_type=HTMLInputType.PASSWORD) |
| 114 | + ] |
| 115 | + |
| 116 | + @classmethod |
| 117 | + def get_display_name(cls) -> str: |
| 118 | + return "BookStack" |
| 119 | + |
| 120 | + @staticmethod |
| 121 | + def list_books(book_stack: BookStack) -> List[Dict]: |
| 122 | + # Usually the book_stack connection fails, so we retry a few times |
| 123 | + retries = 3 |
| 124 | + for i in range(retries): |
| 125 | + try: |
| 126 | + return book_stack.get_all_books() |
| 127 | + except Exception as e: |
| 128 | + logging.error(f"BookStack connection failed: {e}") |
| 129 | + if i == retries - 1: |
| 130 | + raise e |
| 131 | + |
| 132 | + @staticmethod |
| 133 | + def validate_config(config: Dict) -> None: |
| 134 | + try: |
| 135 | + parsed_config = BookStackConfig(**config) |
| 136 | + book_stack = BookStack(url=parsed_config.url, token_id=parsed_config.token_id, |
| 137 | + token_secret=parsed_config.token_secret) |
| 138 | + BookstackDataSource.list_books(book_stack=book_stack) |
| 139 | + except Exception as e: |
| 140 | + raise InvalidDataSourceConfig from e |
| 141 | + |
| 142 | + def __init__(self, *args, **kwargs): |
| 143 | + super().__init__(*args, **kwargs) |
| 144 | + book_stack_config = BookStackConfig(**self._config) |
| 145 | + self._book_stack = BookStack(url=book_stack_config.url, token_id=book_stack_config.token_id, |
| 146 | + token_secret=book_stack_config.token_secret) |
| 147 | + |
| 148 | + def _list_books(self) -> List[Dict]: |
| 149 | + logger.info("Listing books with BookStack") |
| 150 | + return BookstackDataSource.list_books(book_stack=self._book_stack) |
| 151 | + |
| 152 | + def _feed_new_documents(self) -> None: |
| 153 | + logger.info("Feeding new documents with BookStack") |
| 154 | + |
| 155 | + books = self._list_books() |
| 156 | + raw_docs = [] |
| 157 | + for book in books: |
| 158 | + raw_docs.extend(self._list_book_pages(book)) |
| 159 | + |
| 160 | + parse_with_workers(self._parse_documents_worker, raw_docs) |
| 161 | + |
| 162 | + def _parse_documents_worker(self, raw_docs: List[Dict]): |
| 163 | + logger.info(f"Worker parsing {len(raw_docs)} documents") |
| 164 | + |
| 165 | + parsed_docs = [] |
| 166 | + total_fed = 0 |
| 167 | + for raw_page in raw_docs: |
| 168 | + last_modified = datetime.strptime(raw_page["updated_at"], "%Y-%m-%dT%H:%M:%S.%fZ") |
| 169 | + if last_modified < self._last_index_time: |
| 170 | + continue |
| 171 | + |
| 172 | + page_id = raw_page["id"] |
| 173 | + page_content = self._book_stack.get_page(page_id) |
| 174 | + author_name = page_content["created_by"]["name"] |
| 175 | + |
| 176 | + author_image_url = "" |
| 177 | + author = self._book_stack.get_user(raw_page["created_by"]) |
| 178 | + if author: |
| 179 | + author_image_url = author["avatar_url"] |
| 180 | + |
| 181 | + plain_text = html_to_text(page_content["html"]) |
| 182 | + |
| 183 | + url = urljoin(self._config.get('url'), f"/books/{raw_page['book_slug']}/page/{raw_page['slug']}") |
| 184 | + |
| 185 | + parsed_docs.append(BasicDocument(title=raw_page["name"], |
| 186 | + content=plain_text, |
| 187 | + author=author_name, |
| 188 | + author_image_url=author_image_url, |
| 189 | + timestamp=last_modified, |
| 190 | + id=page_id, |
| 191 | + data_source_id=self._data_source_id, |
| 192 | + location=raw_page["book"]["name"], |
| 193 | + url=url, |
| 194 | + type=DocumentType.DOCUMENT)) |
| 195 | + if len(parsed_docs) >= 50: |
| 196 | + total_fed += len(parsed_docs) |
| 197 | + IndexQueue.get_instance().put(docs=parsed_docs) |
| 198 | + parsed_docs = [] |
| 199 | + |
| 200 | + IndexQueue.get_instance().put(docs=parsed_docs) |
| 201 | + total_fed += len(parsed_docs) |
| 202 | + if total_fed > 0: |
| 203 | + logging.info(f"Worker fed {total_fed} documents") |
| 204 | + |
| 205 | + def _list_book_pages(self, book: Dict) -> List[Dict]: |
| 206 | + logger.info(f"Getting documents from book {book['name']} ({book['id']})") |
| 207 | + return self._book_stack.get_all_pages_from_book(book) |
| 208 | + |
| 209 | + |
| 210 | +# if __name__ == "__main__": |
| 211 | +# import os |
| 212 | +# config = {"url": os.environ["BOOKSTACK_URL"], "token_id": os.environ["BOOKSTACK_TOKEN_ID"], |
| 213 | +# "token_secret": os.environ["BOOKSTACK_TOKEN_SECRET"]} |
| 214 | +# book_stack = BookstackDataSource(config=config, data_source_id=0) |
| 215 | +# book_stack._feed_new_documents() |
0 commit comments