Source code for pwncollege.pwncollege

from __future__ import annotations

import atexit
import getpass
import json
import os
import re
from typing import List, Optional, cast, TYPE_CHECKING

import requests

from .constants import SITE_BASE, USER_AGENT
from .errors import (
    AuthenticationException,
    ServerErrorException,
)
from .utils import cookie_expired, parse_csrf_token

if TYPE_CHECKING:
    from .user import User
    from .challenge import Challenge
    from .leaderboard import Leaderboard

class colors:
    """ANSI color codes for use in terminal output

    Examples:
        Using colors::
            print(colors.red + "Hello, world!" + colors.reset)

    """
    def __init__(self):
        pass
    red = "\033[91m"
    green = "\033[92m"
    yellow = "\033[93m"
    blue = "\033[94m"
    reset = "\033[0m"

[docs] class PWNClient: """The client via which API requests are made Examples: Connecting to the API:: from pwncollege import PWNClient client = PWNClient(email="user@example.com", password="S3cr3tP455w0rd!") client = PWNClient(email="unique1234", password="S3cr3tP455w0rd!") value for email can also be username """ _user: Optional["User"] = None _app_cookie: Optional[str] = None _site_base: str nonce: Optional[str] = None session: requests.Session = requests.Session() def __init__( self, email: Optional[str] = None, password: Optional[str] = None, cache: Optional[str] = None, site_base: str = SITE_BASE, app_cookie: Optional[str] = None, notif: bool = True, ): """ Authenticates to the API. If `cache` is set, the client will attempt to load access tokens from the given path. If they cannot be found, or are expired, normal API authentication will take place, and the tokens will be dumped to the file for the next launch. Args: email: The authenticating user's email address password: The authenticating user's password cache: The path to load/store access tokens from/to app_cookie: Authenticate using a provided App Cookie """ self._site_base = site_base if cache is not None: if self.load_from_cache(cache, notif=notif) is False: print(f"{colors.yellow}[!] Failed to load from cache, logging in normally{colors.reset}") self.do_login(email, password, app_cookie) self.dump_to_cache(cache) # Make sure we dump our current tokens out when we exit atexit.register(self.dump_to_cache, cache) else: self.do_login(email, password, app_cookie)
[docs] def do_request( self, endpoint, json_data=None, data=None, post=False, nonce: Optional[str] = None, patch: bool = False, ) -> requests.Response: """ Args: endpoint: The API/normal endpoint to request json_data: Data to be sent in JSON format data: Data to be sent in application/x-www-form-urlencoded format post: Force POST request Returns: The JSON response from the API or the raw response if not JSON """ headers = { "User-Agent": USER_AGENT, "Csrf-Token": nonce, } if patch: r = self.session.patch( self._site_base + endpoint, json=json_data, data=data, headers=headers, ) elif not json_data and not data: if post: r = self.session.post( self._site_base + endpoint, headers=headers ) else: r = self.session.get( self._site_base + endpoint, headers=headers ) else: r = self.session.post( self._site_base + endpoint, json=json_data, data=data, headers=headers, ) if r.status_code >= 500: raise ServerErrorException(f"Server error: {r.status_code}") else: return r
[docs] def load_from_cache(self, cache: str, notif: bool = True) -> bool: """ Args: cache: The cache file path Returns: Whether loading from the cache was successful """ if not os.path.exists(cache): return False with open(cache, "r") as f: try: data = json.load(f) except json.JSONDecodeError: return False self._app_cookie = data.get("app_cookie") if self._app_cookie is None or cookie_expired(self._app_cookie): return False self.session.cookies.set("session", self._app_cookie) if notif: print(f"{colors.green}[+]{colors.reset} Loaded from cache!") return True
[docs] def dump_to_cache(self, cache: str): """ Dumps the current access and refresh tokens to a file Args: cache: The path to the cache file """ if not os.path.exists(os.path.dirname(cache)): return with open(cache, "w") as f: json.dump( { "app_cookie": self._app_cookie, }, f, )
[docs] def do_login( self, email: Optional[str] = None, password: Optional[str] = None, app_cookie: Optional[str] = None, ): """ Authenticates against the API. If credentials are not provided, they will be prompted for. """ if app_cookie is not None: self._app_cookie = app_cookie return self._app_cookie if email is None: email = input(colors.blue + "Enter your username or email? " + colors.reset) if password is None: password = getpass.getpass() if self.nonce is None: first = self.do_request("login") self.nonce = parse_csrf_token(first.text) print(f"{colors.green}[+] CSRF token: {colors.reset}{colors.blue}{self.nonce}{colors.reset}") data = self.do_request( "login", data={ "name": email, "password": password, "_submit": "Submit", "nonce": self.nonce, } ) if "Your username or password is incorrect" not in data.text: print(f"{colors.green}[+] Logged in as {colors.reset}{colors.blue}{email}!{colors.reset}") self._app_cookie = self.session.cookies.values()[0] return self._app_cookie else: raise AuthenticationException("Incorrect username or password")
[docs] def get_dojos(self) -> List[str]: """Requests a list of available dojos Returns: A list of dojos """ data = self.do_request("pwncollege_api/v1/dojos").json() if data["success"] is False: return [] return data["dojos"]
[docs] def get_modules(self, dojo: str) -> List[str]: """Requests a list of available modules in a dojo Args: dojo: The dojo to fetch modules from Returns: A list of modules """ data = self.do_request(f"pwncollege_api/v1/dojos/{dojo}/modules") if data.status_code == 404: print(f"{colors.red}[!] Dojo {dojo} does not exist!{colors.reset}") return [] if data.json()["success"]: return data.json()["modules"] return []
# noinspection PyUnresolvedReferences
[docs] def get_challenges(self, dojo: str, module: str) -> List["Challenge"]: """Requests a list of `Challenge` from the API in a module Args: dojo: The dojo to fetch challenges from module: The module to fetch challenges from Returns: A list of `Challenge` """ data = self.do_request(f"pwncollege_api/v1/dojos/{dojo}/modules") if data.status_code == 404: print(f"{colors.red}[!] Dojo {dojo} does not exist!{colors.reset}") data = data.json() if data["success"] is False: return [] for item in data["modules"]: if item["id"].lower() == module.lower(): return item["challenges"] print(f"{colors.red}[!] Module {module} does not exist in dojo {dojo}!{colors.reset}") return []
# noinspection PyUnresolvedReferences
[docs] def get_challenge_ids(self, dojo: str, module: str) -> dict[str, int]: """Requests a list of `Challenge` IDs from the API in a module Args: dojo: The dojo to fetch challenges from module: The module to fetch challenges from Returns: A list of `Challenge` """ data = self.do_request(f"{dojo}/{module}") if data.status_code == 404: print(f"{colors.red}[!] Dojo {dojo} or module {module} does not exist!{colors.reset}") pairs = re.findall( r'<input id="challenge"[^>]*value="([^"]+)">.*?<input id="challenge-id"[^>]*value="([^"]+)"', data.text, re.DOTALL ) mapping = {name: int(cid) for name, cid in pairs} return mapping
[docs] def create_challenge(self, dojo: str, module: str, chall: Challenge) -> "Challenge": """Creates a `Challenge` object from needed info""" from .challenge import Challenge data = { "id": chall.id, "dojo": dojo, "module": module, "challenge_id": chall.challenge_id, } return Challenge(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_user(self, user_id: int) -> "User": """ Args: user_id: The platform ID of the `User` to fetch Returns: The requested `User` """ from .user import User text = self.do_request(f"/hacker/{user_id}").text name_re = re.search("<h1.*>(.*)</h1>", text) name = name_re.group(1) if name_re else "" r = self.do_request("/pwncollege_api/v1/score?username=" + name) if "user is not ranked" in r.text: score = ["∞", "0"] else: score = r.text.strip('"').split(":") country_name = re.search("<i class=\"flag-.*\"><\/i>\n(.*)\n", text) country = country_name.group(1).strip() if country_name else None university_name = re.search("<span class=\"badge badge-primary\">(.*)<\/span>", text) university = university_name.group(1).strip() if university_name else None belt_re = re.search("<img src=\"/belt/(.*).svg\",? class=\"scoreboard-belt\">", text) belt = belt_re.group(1) if belt_re else None website_re = re.search("<a href=\"(.*)\" target=\"_blank\" style=\"color: inherit;\" rel=\"noopener\">", text) website = website_re.group(1) if website_re else None data = cast(dict, { "id": user_id, "name": name, "ranking": score[0], "points": score[1], "website": website, "country": country, "belt": belt, "university": university, }) return User(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_dojo_ranking(self, dojo: str, duration: int = 0, page: int = 1) -> "Leaderboard": """ Returns: A Leaderboard of the top 20 Users in the Dojo """ from .leaderboard import Leaderboard r = self.do_request(f"pwncollege_api/v1/scoreboard/{dojo}/_/{duration}/{page}") if r.status_code == 404: print(f"{colors.red}[!] Dojo {dojo} does not exist!{colors.reset}") data = cast(dict, r.json())["standings"] return Leaderboard(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_module_ranking(self, dojo: str, module: str, duration: int = 0, page: int = 1) -> "Leaderboard": """ Returns: A Leaderboard of the top 20 Users in the Module """ from .leaderboard import Leaderboard r = self.do_request(f"pwncollege_api/v1/scoreboard/{dojo}/{module}/{duration}/{page}") if r.status_code == 404: print(f"{colors.red}[!] Dojo {dojo} or module {module} does not exist!{colors.reset}") data = cast(dict, r.json())["standings"] return Leaderboard(data, self)
[docs] def get_belts(self) -> dict: """Requests a list of available belts Returns: A list of belts """ data = self.do_request("pwncollege_api/v1/belts").json() return cast(dict, data)
# noinspection PyUnresolvedReferences @property def user(self) -> "User": """ Returns: The `User` associated with the current `PWNClient` """ match = re.search("'userId': (\\d+)", self.do_request("/").text) assert match, "Failed to find User ID" uid = int(match.group(1)) if not self._user: self._user = self.get_user(uid) return self._user
class PWNObject: """Base class of all API objects Attributes: id: The ID of the associated object """ _client: PWNClient id: str def __eq__(self, other): return self.id == other.id and type(self) == type(other)