Assume that you get the following JSON schema specification:
// Saved in schema_def.json { "title": "Customer", "description": "Schema for Customer data", "type": "object", "properties": { "name": { "type": "string", "description": "Customer name" }, "age": { "type": "integer", "description": "Customer age" } }, "required": [ "name" ] }
Naturally, when reading data that adhere to this definition, you'd like to have a programmatic way to obtain a StructType
object.
Based on my research, this seems to be a little tricky and there is no way to do it nicely (i.e., with the help of a well accepted package).
So, here is a little code snippet that you can use to achieve what you need:
import json from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, LongType, NullType, ) def json_schema_to_pyspark_struct(schema: dict) -> StructType: """ Converts a JSON schema dictionary to a PySpark StructType. This function recursively parses the JSON schema and maps its types to the corresponding PySpark SQL data types. Args: schema: A dictionary representing the JSON schema. Returns: A PySpark StructType object representing the schema. Raises: ValueError: If an unsupported JSON schema type is encountered. """ # Mapping from JSON schema types to PySpark data types type_mapping = { "string": StringType(), "number": DoubleType(), # More general than FloatType "integer": LongType(), # More general than IntegerType "boolean": BooleanType(), "null": NullType(), } fields = [] # Get the list of required fields, if specified required_fields = schema.get("required", []) # Iterate over each property defined in the schema for property_name, property_details in schema.get("properties", {}).items(): field_type_str = property_details.get("type") is_nullable = property_name not in required_fields field_type = None if field_type_str == "object": # Recursively call the function for nested objects field_type = json_schema_to_pyspark_struct(property_details) elif field_type_str == "array": # Handle arrays by getting the type of the items in the array item_schema = property_details.get("items", {}) if not item_schema: # Default to StringType if items schema is not defined item_type = StringType() elif item_schema.get("type") == "object": # Recursively call for arrays of objects item_type = json_schema_to_pyspark_struct(item_schema) else: item_type = type_mapping.get(item_schema.get("type")) if not item_type: raise ValueError(f"Unsupported array item type: {item_schema.get('type')}") field_type = ArrayType(item_type, True) # Arrays can contain nulls else: # Map simple types using the dictionary field_type = type_mapping.get(field_type_str) if not field_type: raise ValueError(f"Unsupported type for property '{property_name}': {field_type_str}") # Add the constructed field to our list of fields fields.append(StructField(property_name, field_type, is_nullable)) return StructType(fields)
So, you can obtain the StructType
as follows:
with open("schema_def.json") as f: schema_def = json.load(f) schema = json_schema_to_pyspark_struct(schema_def)
This yields: StructType([StructField('name', StringType(), False), StructField('age', LongType(), True)])
.
Nice, using this schema, we can now read/create data that adhere to the schema.
For example:
spark.createDataFrame([["Alice", 10], ["Bob", 11]], schema=schema)
What do you think? How do you solve this use case?
Disclaimer: The code above is mostly a collaboration with Gemini. So be careful ;)
Top comments (0)