Dynamically creating avro schema
Recently ran into a use case where I had to convert data coming in as a csv file to avro and the schema was defined in a business document.
Since avro needs a schema and there wasn’t expectation for business to provide .avsc file. Schema could evolve over time and maintenance of .avsc was yet another challenge. That led to design a system where schema would be defined in a generic way and would be fed into avro generator. Following is the approach:
Create avro schema dynamically:
FieldAssembler<Schema> record = SchemaBuilder.record("name")
.namespace("org.data.test")
.fields();//from columms defined elsewhere for the data.
List<Columns> columns = getColumns()
for( Columns column : columns) { String fieldName = column.getName();
String type = column.getType(); switch ( type ) {
case STRING:
record.requiredString(fieldName);
break; case INTEGER:
record.requiredInt(fieldName);
break; case BOOLEAN:
record.requiredBoolean(fieldName);
break; case NUMERIC:
record.requiredInt(fieldName);
break; case DATETIME:
record.name(fieldName)
.type(LogicalTypes.timestampMillis()
.addToSchema(Schema.create(Schema.Type.LONG)))
.noDefault();
break; case DATE:
record.name(fieldName)
.type(LogicalTypes.date()
.addToSchema(Schema.create(Schema.Type.INT)))
.noDefault();
break; }}
Schema schema = record.endRecord();
Handling of datetime
In the schema I am declaring datetime as long and hence using this transformation in my avro generatorpublic long getDateTimeFromUTCLong(String time) throws ParseException {if( time != null && time.length() > 0 ) { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");long longTime = dateFormat.parse(time).getTime();
return longTime;}return 0;}
or Java 8 way
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss z");Instant instant = Instant.from(dtf.parse(date));
return instant.getEpochSecond();
Now create an avro file while using the schema generated above
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);File file = new File(avroFileName);
dataFileWriter.create(schema, file);GenericRecord row = new GenericData.Record(schema);dataFileWriter.append(row);...
dataFileWriter.close();