| 
13 | 13 | from tests.common.db.organizations import (  | 
14 | 14 |  OrganizationFactory,  | 
15 | 15 |  OrganizationManualActivationFactory,  | 
 | 16 | + OrganizationOIDCIssuerFactory,  | 
16 | 17 |  OrganizationRoleFactory,  | 
17 | 18 |  OrganizationStripeCustomerFactory,  | 
18 | 19 | )  | 
19 | 20 | from tests.common.db.subscriptions import StripeCustomerFactory  | 
20 | 21 | from warehouse.admin.views import organizations as views  | 
21 | 22 | from warehouse.organizations.models import (  | 
 | 23 | + OIDCIssuerType,  | 
22 | 24 |  OrganizationManualActivation,  | 
 | 25 | + OrganizationOIDCIssuer,  | 
23 | 26 |  OrganizationRole,  | 
24 | 27 |  OrganizationRoleType,  | 
25 | 28 |  OrganizationType,  | 
@@ -1736,3 +1739,317 @@ def test_set_total_size_limit_below_default(self, db_request):  | 
1736 | 1739 |  )  | 
1737 | 1740 |  ]  | 
1738 | 1741 |  assert result.status_code == 303  | 
 | 1742 | + | 
 | 1743 | + | 
 | 1744 | +class TestAddOIDCIssuer:  | 
 | 1745 | + def test_add_oidc_issuer_success(self, db_request, monkeypatch):  | 
 | 1746 | + organization = OrganizationFactory.create()  | 
 | 1747 | + admin_user = UserFactory.create(username="admin")  | 
 | 1748 | + | 
 | 1749 | + # Mock record_event  | 
 | 1750 | + record_event = pretend.call_recorder(lambda **kwargs: None)  | 
 | 1751 | + monkeypatch.setattr(organization, "record_event", record_event)  | 
 | 1752 | + | 
 | 1753 | + db_request.matchdict = {"organization_id": str(organization.id)}  | 
 | 1754 | + db_request.user = admin_user  | 
 | 1755 | + db_request.route_path = pretend.call_recorder(  | 
 | 1756 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1757 | + )  | 
 | 1758 | + db_request.session = pretend.stub(  | 
 | 1759 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1760 | + )  | 
 | 1761 | + db_request.POST = MultiDict(  | 
 | 1762 | + {  | 
 | 1763 | + "issuer_type": "gitlab",  | 
 | 1764 | + "issuer_url": "https://gitlab.company.com",  | 
 | 1765 | + }  | 
 | 1766 | + )  | 
 | 1767 | + | 
 | 1768 | + result = views.add_oidc_issuer(db_request)  | 
 | 1769 | + | 
 | 1770 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1771 | + assert result.location == "/admin/organizations/"  | 
 | 1772 | + | 
 | 1773 | + assert db_request.session.flash.calls == [  | 
 | 1774 | + pretend.call(  | 
 | 1775 | + "OIDC issuer 'https://gitlab.company.com' (gitlab) added to "  | 
 | 1776 | + f"'{organization.name}'",  | 
 | 1777 | + queue="success",  | 
 | 1778 | + )  | 
 | 1779 | + ]  | 
 | 1780 | + | 
 | 1781 | + issuer = db_request.db.query(OrganizationOIDCIssuer).one()  | 
 | 1782 | + assert issuer.issuer_type == OIDCIssuerType.GitLab  | 
 | 1783 | + assert issuer.issuer_url == "https://gitlab.company.com"  | 
 | 1784 | + assert issuer.organization == organization  | 
 | 1785 | + assert issuer.created_by == admin_user  | 
 | 1786 | + | 
 | 1787 | + # Check event was recorded  | 
 | 1788 | + assert record_event.calls == [  | 
 | 1789 | + pretend.call(  | 
 | 1790 | + request=db_request,  | 
 | 1791 | + tag="admin:organization:oidc_issuer:add",  | 
 | 1792 | + additional={  | 
 | 1793 | + "issuer_type": "gitlab",  | 
 | 1794 | + "issuer_url": "https://gitlab.company.com",  | 
 | 1795 | + },  | 
 | 1796 | + )  | 
 | 1797 | + ]  | 
 | 1798 | + | 
 | 1799 | + def test_add_oidc_issuer_invalid_form(self, db_request):  | 
 | 1800 | + organization = OrganizationFactory.create()  | 
 | 1801 | + admin_user = UserFactory.create(username="admin")  | 
 | 1802 | + | 
 | 1803 | + db_request.matchdict = {"organization_id": str(organization.id)}  | 
 | 1804 | + db_request.user = admin_user  | 
 | 1805 | + db_request.route_path = pretend.call_recorder(  | 
 | 1806 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1807 | + )  | 
 | 1808 | + db_request.session = pretend.stub(  | 
 | 1809 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1810 | + )  | 
 | 1811 | + # Missing issuer_url  | 
 | 1812 | + db_request.POST = MultiDict({"issuer_type": "gitlab"})  | 
 | 1813 | + | 
 | 1814 | + result = views.add_oidc_issuer(db_request)  | 
 | 1815 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1816 | + | 
 | 1817 | + # Should flash form validation errors  | 
 | 1818 | + assert len(db_request.session.flash.calls) > 0  | 
 | 1819 | + assert "error" in str(db_request.session.flash.calls[0])  | 
 | 1820 | + | 
 | 1821 | + def test_add_oidc_issuer_invalid_url(self, db_request):  | 
 | 1822 | + organization = OrganizationFactory.create()  | 
 | 1823 | + admin_user = UserFactory.create(username="admin")  | 
 | 1824 | + | 
 | 1825 | + db_request.matchdict = {"organization_id": str(organization.id)}  | 
 | 1826 | + db_request.user = admin_user  | 
 | 1827 | + db_request.route_path = pretend.call_recorder(  | 
 | 1828 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1829 | + )  | 
 | 1830 | + db_request.session = pretend.stub(  | 
 | 1831 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1832 | + )  | 
 | 1833 | + # Invalid URL (not https)  | 
 | 1834 | + db_request.POST = MultiDict(  | 
 | 1835 | + {  | 
 | 1836 | + "issuer_type": "gitlab",  | 
 | 1837 | + "issuer_url": "http://gitlab.company.com",  | 
 | 1838 | + }  | 
 | 1839 | + )  | 
 | 1840 | + | 
 | 1841 | + result = views.add_oidc_issuer(db_request)  | 
 | 1842 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1843 | + | 
 | 1844 | + # Should flash form validation errors  | 
 | 1845 | + flash_messages = [call.args[0] for call in db_request.session.flash.calls]  | 
 | 1846 | + assert any("https://" in msg for msg in flash_messages)  | 
 | 1847 | + | 
 | 1848 | + def test_add_oidc_issuer_duplicate(self, db_request, monkeypatch):  | 
 | 1849 | + organization = OrganizationFactory.create()  | 
 | 1850 | + admin_user = UserFactory.create(username="admin")  | 
 | 1851 | + | 
 | 1852 | + # Create existing issuer  | 
 | 1853 | + OrganizationOIDCIssuerFactory.create(  | 
 | 1854 | + organization=organization,  | 
 | 1855 | + issuer_type=OIDCIssuerType.GitLab,  | 
 | 1856 | + issuer_url="https://gitlab.company.com",  | 
 | 1857 | + created_by=admin_user,  | 
 | 1858 | + )  | 
 | 1859 | + | 
 | 1860 | + # Mock record_event (should not be called on duplicate)  | 
 | 1861 | + record_event = pretend.call_recorder(lambda **kwargs: None)  | 
 | 1862 | + monkeypatch.setattr(organization, "record_event", record_event)  | 
 | 1863 | + | 
 | 1864 | + db_request.matchdict = {"organization_id": str(organization.id)}  | 
 | 1865 | + db_request.user = admin_user  | 
 | 1866 | + db_request.route_path = pretend.call_recorder(  | 
 | 1867 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1868 | + )  | 
 | 1869 | + db_request.session = pretend.stub(  | 
 | 1870 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1871 | + )  | 
 | 1872 | + db_request.POST = MultiDict(  | 
 | 1873 | + {  | 
 | 1874 | + "issuer_type": "gitlab",  | 
 | 1875 | + "issuer_url": "https://gitlab.company.com",  | 
 | 1876 | + }  | 
 | 1877 | + )  | 
 | 1878 | + | 
 | 1879 | + result = views.add_oidc_issuer(db_request)  | 
 | 1880 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1881 | + | 
 | 1882 | + assert db_request.session.flash.calls == [  | 
 | 1883 | + pretend.call(  | 
 | 1884 | + "Issuer 'https://gitlab.company.com' already exists "  | 
 | 1885 | + f"for organization '{organization.name}'",  | 
 | 1886 | + queue="error",  | 
 | 1887 | + )  | 
 | 1888 | + ]  | 
 | 1889 | + | 
 | 1890 | + # No new event recorded  | 
 | 1891 | + assert record_event.calls == []  | 
 | 1892 | + | 
 | 1893 | + def test_add_oidc_issuer_organization_not_found(self, db_request):  | 
 | 1894 | + admin_user = UserFactory.create(username="admin")  | 
 | 1895 | + | 
 | 1896 | + db_request.matchdict = {  | 
 | 1897 | + "organization_id": "00000000-0000-0000-0000-000000000000"  | 
 | 1898 | + }  | 
 | 1899 | + db_request.user = admin_user  | 
 | 1900 | + | 
 | 1901 | + with pytest.raises(HTTPNotFound):  | 
 | 1902 | + views.add_oidc_issuer(db_request)  | 
 | 1903 | + | 
 | 1904 | + | 
 | 1905 | +class TestDeleteOIDCIssuer:  | 
 | 1906 | + def test_delete_oidc_issuer_success(self, db_request, monkeypatch):  | 
 | 1907 | + organization = OrganizationFactory.create()  | 
 | 1908 | + admin_user = UserFactory.create(username="admin")  | 
 | 1909 | + | 
 | 1910 | + issuer = OrganizationOIDCIssuerFactory.create(  | 
 | 1911 | + organization=organization,  | 
 | 1912 | + issuer_type=OIDCIssuerType.GitLab,  | 
 | 1913 | + issuer_url="https://gitlab.company.com",  | 
 | 1914 | + created_by=admin_user,  | 
 | 1915 | + )  | 
 | 1916 | + | 
 | 1917 | + # Mock record_event  | 
 | 1918 | + record_event = pretend.call_recorder(lambda **kwargs: None)  | 
 | 1919 | + monkeypatch.setattr(organization, "record_event", record_event)  | 
 | 1920 | + | 
 | 1921 | + db_request.matchdict = {  | 
 | 1922 | + "organization_id": str(organization.id),  | 
 | 1923 | + "issuer_id": str(issuer.id),  | 
 | 1924 | + }  | 
 | 1925 | + db_request.route_path = pretend.call_recorder(  | 
 | 1926 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1927 | + )  | 
 | 1928 | + db_request.session = pretend.stub(  | 
 | 1929 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1930 | + )  | 
 | 1931 | + db_request.POST = MultiDict({"confirm": "https://gitlab.company.com"})  | 
 | 1932 | + | 
 | 1933 | + result = views.delete_oidc_issuer(db_request)  | 
 | 1934 | + | 
 | 1935 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1936 | + assert result.location == "/admin/organizations/"  | 
 | 1937 | + | 
 | 1938 | + assert db_request.session.flash.calls == [  | 
 | 1939 | + pretend.call(  | 
 | 1940 | + "OIDC issuer 'https://gitlab.company.com' removed "  | 
 | 1941 | + f"from '{organization.name}'",  | 
 | 1942 | + queue="success",  | 
 | 1943 | + )  | 
 | 1944 | + ]  | 
 | 1945 | + | 
 | 1946 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 0  | 
 | 1947 | + | 
 | 1948 | + # Check event was recorded  | 
 | 1949 | + assert record_event.calls == [  | 
 | 1950 | + pretend.call(  | 
 | 1951 | + request=db_request,  | 
 | 1952 | + tag="admin:organization:oidc_issuer:delete",  | 
 | 1953 | + additional={  | 
 | 1954 | + "issuer_type": "gitlab",  | 
 | 1955 | + "issuer_url": "https://gitlab.company.com",  | 
 | 1956 | + },  | 
 | 1957 | + )  | 
 | 1958 | + ]  | 
 | 1959 | + | 
 | 1960 | + def test_delete_oidc_issuer_not_found(self, db_request):  | 
 | 1961 | + organization = OrganizationFactory.create()  | 
 | 1962 | + | 
 | 1963 | + db_request.matchdict = {  | 
 | 1964 | + "organization_id": str(organization.id),  | 
 | 1965 | + "issuer_id": "00000000-0000-0000-0000-000000000000",  | 
 | 1966 | + }  | 
 | 1967 | + db_request.route_path = pretend.call_recorder(  | 
 | 1968 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1969 | + )  | 
 | 1970 | + db_request.session = pretend.stub(  | 
 | 1971 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 1972 | + )  | 
 | 1973 | + db_request.POST = MultiDict({"confirm": "https://gitlab.company.com"})  | 
 | 1974 | + | 
 | 1975 | + result = views.delete_oidc_issuer(db_request)  | 
 | 1976 | + assert isinstance(result, HTTPSeeOther)  | 
 | 1977 | + | 
 | 1978 | + assert db_request.session.flash.calls == [  | 
 | 1979 | + pretend.call("This issuer does not exist", queue="error")  | 
 | 1980 | + ]  | 
 | 1981 | + | 
 | 1982 | + def test_delete_oidc_issuer_wrong_confirmation(self, db_request):  | 
 | 1983 | + organization = OrganizationFactory.create()  | 
 | 1984 | + admin_user = UserFactory.create(username="admin")  | 
 | 1985 | + | 
 | 1986 | + issuer = OrganizationOIDCIssuerFactory.create(  | 
 | 1987 | + organization=organization,  | 
 | 1988 | + issuer_type=OIDCIssuerType.GitLab,  | 
 | 1989 | + issuer_url="https://gitlab.company.com",  | 
 | 1990 | + created_by=admin_user,  | 
 | 1991 | + )  | 
 | 1992 | + | 
 | 1993 | + db_request.matchdict = {  | 
 | 1994 | + "organization_id": str(organization.id),  | 
 | 1995 | + "issuer_id": str(issuer.id),  | 
 | 1996 | + }  | 
 | 1997 | + db_request.route_path = pretend.call_recorder(  | 
 | 1998 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 1999 | + )  | 
 | 2000 | + db_request.session = pretend.stub(  | 
 | 2001 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 2002 | + )  | 
 | 2003 | + db_request.POST = MultiDict({"confirm": "https://wrong-url.com"})  | 
 | 2004 | + | 
 | 2005 | + result = views.delete_oidc_issuer(db_request)  | 
 | 2006 | + assert isinstance(result, HTTPSeeOther)  | 
 | 2007 | + | 
 | 2008 | + assert db_request.session.flash.calls == [  | 
 | 2009 | + pretend.call("Confirm the request", queue="error")  | 
 | 2010 | + ]  | 
 | 2011 | + | 
 | 2012 | + # Issuer should still exist  | 
 | 2013 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 1  | 
 | 2014 | + | 
 | 2015 | + def test_delete_oidc_issuer_no_confirmation(self, db_request):  | 
 | 2016 | + organization = OrganizationFactory.create()  | 
 | 2017 | + admin_user = UserFactory.create(username="admin")  | 
 | 2018 | + | 
 | 2019 | + issuer = OrganizationOIDCIssuerFactory.create(  | 
 | 2020 | + organization=organization,  | 
 | 2021 | + issuer_type=OIDCIssuerType.GitLab,  | 
 | 2022 | + issuer_url="https://gitlab.company.com",  | 
 | 2023 | + created_by=admin_user,  | 
 | 2024 | + )  | 
 | 2025 | + | 
 | 2026 | + db_request.matchdict = {  | 
 | 2027 | + "organization_id": str(organization.id),  | 
 | 2028 | + "issuer_id": str(issuer.id),  | 
 | 2029 | + }  | 
 | 2030 | + db_request.route_path = pretend.call_recorder(  | 
 | 2031 | + lambda *a, **kw: "/admin/organizations/"  | 
 | 2032 | + )  | 
 | 2033 | + db_request.session = pretend.stub(  | 
 | 2034 | + flash=pretend.call_recorder(lambda *a, **kw: None)  | 
 | 2035 | + )  | 
 | 2036 | + db_request.POST = MultiDict({})  | 
 | 2037 | + | 
 | 2038 | + result = views.delete_oidc_issuer(db_request)  | 
 | 2039 | + assert isinstance(result, HTTPSeeOther)  | 
 | 2040 | + | 
 | 2041 | + assert db_request.session.flash.calls == [  | 
 | 2042 | + pretend.call("Confirm the request", queue="error")  | 
 | 2043 | + ]  | 
 | 2044 | + | 
 | 2045 | + # Issuer should still exist  | 
 | 2046 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 1  | 
 | 2047 | + | 
 | 2048 | + def test_delete_oidc_issuer_organization_not_found(self, db_request):  | 
 | 2049 | + db_request.matchdict = {  | 
 | 2050 | + "organization_id": "00000000-0000-0000-0000-000000000000",  | 
 | 2051 | + "issuer_id": "00000000-0000-0000-0000-000000000001",  | 
 | 2052 | + }  | 
 | 2053 | + | 
 | 2054 | + with pytest.raises(HTTPNotFound):  | 
 | 2055 | + views.delete_oidc_issuer(db_request)  | 
0 commit comments