Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install dev dependencies
run: python -m pip install -r requirements/dev.txt
- name: Run linting
run: python -m tox -e lint
run: python -m tox -e lint,mypy,mypy-samples-image,mypy-samples-json

test:
strategy:
Expand Down
2 changes: 1 addition & 1 deletion cloudevents/abstract/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CloudEvent:
@classmethod
def create(
cls: typing.Type[AnyCloudEvent],
attributes: typing.Dict[str, typing.Any],
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> AnyCloudEvent:
"""
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def from_json(

def from_http(
event_type: typing.Type[AnyCloudEvent],
headers: typing.Mapping[str, str],
headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
Expand Down Expand Up @@ -260,7 +262,7 @@ def best_effort_encode_attribute_value(value: typing.Any) -> typing.Any:

def from_dict(
event_type: typing.Type[AnyCloudEvent],
event: typing.Dict[str, typing.Any],
event: typing.Mapping[str, typing.Any],
) -> AnyCloudEvent:
"""
Constructs an Event object of a given `event_type` from
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/http/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def from_json(


def from_http(
headers: typing.Dict[str, str],
headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent:
Expand All @@ -58,7 +60,7 @@ def from_http(


def from_dict(
event: typing.Dict[str, typing.Any],
event: typing.Mapping[str, typing.Any],
) -> CloudEvent:
"""
Constructs a CloudEvent from a dict `event` representation.
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/http/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class CloudEvent(abstract.CloudEvent):

@classmethod
def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent":
return cls(attributes, data)

def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None):
def __init__(self, attributes: typing.Mapping[str, str], data: typing.Any = None):
"""
Event Constructor
:param attributes: a dict with cloudevent attributes. Minimally
Expand Down
48 changes: 44 additions & 4 deletions cloudevents/kafka/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,29 @@ def to_binary(
return KafkaMessage(headers, message_key, data)


@typing.overload
def from_binary(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
event_type: None = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> http.CloudEvent:
pass


@typing.overload
def from_binary(
message: KafkaMessage,
event_type: typing.Type[AnyCloudEvent],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
pass


def from_binary(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
"""
Returns a CloudEvent from a KafkaMessage in binary format.

Expand Down Expand Up @@ -144,10 +162,11 @@ def from_binary(
raise cloud_exceptions.DataUnmarshallerError(
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
)
result: typing.Union[http.CloudEvent, AnyCloudEvent]
if event_type:
result = event_type.create(attributes, data)
else:
result = http.CloudEvent.create(attributes, data) # type: ignore
result = http.CloudEvent.create(attributes, data)
return result


Expand Down Expand Up @@ -210,12 +229,32 @@ def to_structured(
return KafkaMessage(headers, message_key, value)


@typing.overload
def from_structured(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
event_type: None = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> http.CloudEvent:
pass


@typing.overload
def from_structured(
message: KafkaMessage,
event_type: typing.Type[AnyCloudEvent],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
pass


def from_structured(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
"""
Returns a CloudEvent from a KafkaMessage in structured format.

Expand Down Expand Up @@ -264,8 +303,9 @@ def from_structured(
attributes["datacontenttype"] = val.decode()
else:
attributes[header.lower()] = val.decode()
result: typing.Union[AnyCloudEvent, http.CloudEvent]
if event_type:
result = event_type.create(attributes, data)
else:
result = http.CloudEvent.create(attributes, data) # type: ignore
result = http.CloudEvent.create(attributes, data)
return result
6 changes: 4 additions & 2 deletions cloudevents/pydantic/v1/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@


def from_http(
headers: typing.Dict[str, str],
headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent:
Expand Down Expand Up @@ -63,7 +65,7 @@ def from_json(


def from_dict(
event: typing.Dict[str, typing.Any],
event: typing.Mapping[str, typing.Any],
) -> CloudEvent:
"""
Construct an CloudEvent from a dict `event` representation.
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/pydantic/v1/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore

@classmethod
def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent":
return cls(attributes, data)

Expand Down Expand Up @@ -155,7 +157,7 @@ def create(

def __init__( # type: ignore[no-untyped-def]
self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None,
**kwargs,
):
Expand Down
6 changes: 4 additions & 2 deletions cloudevents/pydantic/v2/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@


def from_http(
headers: typing.Dict[str, str],
headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent:
Expand Down Expand Up @@ -64,7 +66,7 @@ def from_json(


def from_dict(
event: typing.Dict[str, typing.Any],
event: typing.Mapping[str, typing.Any],
) -> CloudEvent:
"""
Construct an CloudEvent from a dict `event` representation.
Expand Down
8 changes: 6 additions & 2 deletions cloudevents/pydantic/v2/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore

@classmethod
def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent":
return cls(attributes, data)

Expand Down Expand Up @@ -103,7 +105,7 @@ def create(

def __init__( # type: ignore[no-untyped-def]
self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None,
**kwargs,
):
Expand Down Expand Up @@ -173,6 +175,8 @@ def model_validate_json(
*,
strict: typing.Optional[bool] = None,
context: typing.Optional[typing.Dict[str, Any]] = None,
by_alias: typing.Optional[bool] = None,
by_name: typing.Optional[bool] = None,
) -> "CloudEvent":
return conversion.from_json(cls, json_data)

Expand Down
23 changes: 14 additions & 9 deletions cloudevents/sdk/event/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import typing

from cloudevents.sdk.event import base, opt

if typing.TYPE_CHECKING:
from typing_extensions import Self


class Event(base.BaseEvent):
_ce_required_fields = {"id", "source", "type", "specversion"}
Expand Down Expand Up @@ -79,39 +84,39 @@ def Extensions(self) -> dict:
return {}
return dict(result)

def SetEventType(self, eventType: str) -> base.BaseEvent:
def SetEventType(self, eventType: str) -> Self:
self.Set("type", eventType)
return self

def SetSource(self, source: str) -> base.BaseEvent:
def SetSource(self, source: str) -> Self:
self.Set("source", source)
return self

def SetEventID(self, eventID: str) -> base.BaseEvent:
def SetEventID(self, eventID: str) -> Self:
self.Set("id", eventID)
return self

def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent:
def SetEventTime(self, eventTime: typing.Optional[str]) -> Self:
self.Set("time", eventTime)
return self

def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent:
def SetSubject(self, subject: typing.Optional[str]) -> Self:
self.Set("subject", subject)
return self

def SetSchema(self, schema: typing.Optional[str]) -> base.BaseEvent:
def SetSchema(self, schema: typing.Optional[str]) -> Self:
self.Set("dataschema", schema)
return self

def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent:
def SetContentType(self, contentType: typing.Optional[str]) -> Self:
self.Set("datacontenttype", contentType)
return self

def SetData(self, data: typing.Optional[object]) -> base.BaseEvent:
def SetData(self, data: typing.Optional[object]) -> Self:
self.Set("data", data)
return self

def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent:
def SetExtensions(self, extensions: typing.Optional[dict]) -> Self:
self.Set("extensions", extensions)
return self

Expand Down
16 changes: 16 additions & 0 deletions cloudevents/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,25 @@

import typing

_K_co = typing.TypeVar("_K_co", covariant=True)
_V_co = typing.TypeVar("_V_co", covariant=True)

# Use consistent types for marshal and unmarshal functions across
# both JSON and Binary format.

MarshallerType = typing.Callable[[typing.Any], typing.AnyStr]

UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any]


class SupportsDuplicateItems(typing.Protocol[_K_co, _V_co]):
"""
Dict-like objects with an items() method that may produce duplicate keys.
"""

# This is wider than _typeshed.SupportsItems, which expects items() to
# return type an AbstractSet. werkzeug's Headers class satisfies this type,
# but not _typeshed.SupportsItems.

def items(self) -> typing.Iterable[typing.Tuple[_K_co, _V_co]]:
pass
6 changes: 3 additions & 3 deletions cloudevents/tests/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
def test_binary_converter_raise_unsupported():
with pytest.raises(exceptions.UnsupportedEvent):
cnvtr = binary.BinaryHTTPCloudEventConverter()
cnvtr.read(None, {}, None, None)
cnvtr.read(None, {}, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501


def test_base_converters_raise_exceptions():
Expand All @@ -35,8 +35,8 @@ def test_base_converters_raise_exceptions():

with pytest.raises(Exception):
cnvtr = base.Converter()
cnvtr.write(None, None)
cnvtr.write(None, None) # type: ignore[arg-type] # intentionally wrong type

with pytest.raises(Exception):
cnvtr = base.Converter()
cnvtr.read(None, None, None, None)
cnvtr.read(None, None, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501
2 changes: 1 addition & 1 deletion cloudevents/tests/test_event_from_request_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@pytest.mark.parametrize("event_class", [v03.Event, v1.Event])
def test_binary_converter_upstream(event_class):
m = marshaller.NewHTTPMarshaller([binary.NewBinaryHTTPCloudEventConverter()])
event = m.FromRequest(event_class(), data.headers[event_class], None, lambda x: x)
event = m.FromRequest(event_class(), data.headers[event_class], b"", lambda x: x)
assert event is not None
assert event.EventType() == data.ce_type
assert event.EventID() == data.ce_id
Expand Down
2 changes: 1 addition & 1 deletion cloudevents/tests/test_event_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_object_event_v1():
_, structured_body = m.ToRequest(event)
assert isinstance(structured_body, bytes)
structured_obj = json.loads(structured_body)
error_msg = f"Body was {structured_body}, obj is {structured_obj}"
error_msg = f"Body was {structured_body!r}, obj is {structured_obj}"
assert isinstance(structured_obj, dict), error_msg
assert isinstance(structured_obj["data"], dict), error_msg
assert len(structured_obj["data"]) == 1, error_msg
Expand Down
Loading
Loading