|
1 | 1 | # eb's impl
|
2 | 2 |
|
| 3 | +class URLParseError(Exception): |
| 4 | + def __init__(self, message): |
| 5 | + self.message = message |
| 6 | + |
| 7 | + def __str__(self): |
| 8 | + return "URL parse error: {}".format(self.message) |
| 9 | + |
| 10 | + |
| 11 | +def url_parse(*args): |
| 12 | + """Takes a string URL and returns a dictionary of its various parts.""" |
| 13 | + ret = {"scheme": None, "host": None, "path": None, "port": None, "fragment": None, "query": None, "userinfo": None} |
| 14 | + |
| 15 | + if len(args) == 0: |
| 16 | + return ret |
| 17 | + url = args[0] |
| 18 | + |
| 19 | + if "://" not in url: |
| 20 | + raise URLParseError("Missing scheme") |
| 21 | + |
| 22 | + scheme_rest = url.split("://") |
| 23 | + # scheme, *rest = url.split("://") # py3 only |
| 24 | + ret["scheme"], rest = scheme_rest[0].lower(), scheme_rest[1] |
| 25 | + ret["port"] = 80 if ret["scheme"] == "http" else 443 if ret["scheme"] == "https" else None |
| 26 | + |
| 27 | + if "/" not in rest: |
| 28 | + raise URLParseError("Missing authority") |
| 29 | + |
| 30 | + authority_rest = rest.split("/", 1) |
| 31 | + authority, path_query_fragment = authority_rest[0], "/" + authority_rest[1] |
| 32 | + |
| 33 | + userinfo = authority.split("@")[0] if "@" in authority else None |
| 34 | + |
| 35 | + # avoid "if @ in authority" by using replace |
| 36 | + host_port = authority.replace("{}@".format(userinfo), "") |
| 37 | + |
| 38 | + port = host_port.split(":")[1] if ":" in host_port else None |
| 39 | + if port is not None: |
| 40 | + if not port.isdigit(): |
| 41 | + raise URLParseError("Invalid port: {}".format(port)) |
| 42 | + ret["port"] = int(port) |
| 43 | + |
| 44 | + ret["query"] = path_query_fragment.split("?")[1].split("#")[0] if "?" in path_query_fragment else None |
| 45 | + ret["fragment"] = path_query_fragment.split("#")[1] if "#" in path_query_fragment else None |
| 46 | + ret["host"] = host_port.split(":")[0] |
| 47 | + ret["userinfo"] = userinfo |
| 48 | + ret["path"] = path_query_fragment.split("?")[0].split("#")[0] |
| 49 | + |
| 50 | + return ret |
| 51 | + |
| 52 | + |
| 53 | +def url_join(*args): |
| 54 | + """Takes a dictionary of URL parts and returns a valid URL.""" |
| 55 | + in_dict = args[0] if len(args) >= 1 else None |
| 56 | + if not in_dict: |
| 57 | + return "" |
| 58 | + |
| 59 | + scheme = in_dict["scheme"] |
| 60 | + userinfo = port = query = fragment = "" |
| 61 | + |
| 62 | + _userinfo = in_dict.get("userinfo", None) |
| 63 | + userinfo = "{}@".format(_userinfo) if _userinfo else "" |
| 64 | + |
| 65 | + _query = in_dict.get("query", None) |
| 66 | + query = "?{}".format(_query) if _query else "" |
| 67 | + |
| 68 | + _fragment = in_dict.get("fragment", "") |
| 69 | + fragment = "#{}".format(_fragment) if _fragment else "" |
| 70 | + |
| 71 | + _port = in_dict.get("port", "") |
| 72 | + if _port: |
| 73 | + if (scheme == "https" and _port != 443) or (scheme == "http" and _port != 80): |
| 74 | + port = ":{}".format(_port) |
| 75 | + |
| 76 | + return "{scheme}://{userinfo}{host}{port}{path}{query}{fragment}".format( |
| 77 | + scheme=scheme, userinfo=userinfo, |
| 78 | + host=in_dict["host"], port=port, path=in_dict["path"], |
| 79 | + query=query, fragment=fragment |
| 80 | + ) |
| 81 | + |
| 82 | +# test lines |
| 83 | +import fixture |
| 84 | +fixture.url_parse = url_parse |
| 85 | +fixture.url_join = url_join |
| 86 | +from tests import * |
0 commit comments