createDataIntegrationFlow
inline suspend fun SupplyChainClient.createDataIntegrationFlow(crossinline block: CreateDataIntegrationFlowRequest.Builder.() -> Unit): CreateDataIntegrationFlowResponse
Enables you to programmatically create a data pipeline to ingest data from source systems such as Amazon S3 buckets, to a predefined Amazon Web Services Supply Chain dataset (product, inbound_order) or a temporary dataset along with the data transformation query provided with the API.
Samples
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetOptions
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetSourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetTargetConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategy
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategyType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeField
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeSortOrder
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowLoadType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowS3SourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSource
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSourceType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSqlTransformationConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTarget
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTargetType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformation
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformationType
fun main() {
//sampleStart
// Successful CreateDataIntegrationFlow for s3 to dataset flow
val resp = supplyChainClient.createDataIntegrationFlow {
instanceId = "8850c54e-e187-4fa7-89d4-6370f165174d"
name = "testStagingFlow"
sources = listOf<DataIntegrationFlowSource>(
DataIntegrationFlowSource {
sourceType = DataIntegrationFlowSourceType.fromValue("S3")
sourceName = "testSourceName"
s3Source = DataIntegrationFlowS3SourceConfiguration {
bucketName = "aws-supply-chain-data-b8c7bb28-a576-4334-b481-6d6e8e47371f"
prefix = "example-prefix"
}
}
)
transformation = DataIntegrationFlowTransformation {
transformationType = DataIntegrationFlowTransformationType.fromValue("SQL")
sqlTransformation = DataIntegrationFlowSqlTransformationConfiguration {
query = "SELECT * FROM testSourceName"
}
}
target = DataIntegrationFlowTarget {
targetType = DataIntegrationFlowTargetType.fromValue("DATASET")
datasetTarget = DataIntegrationFlowDatasetTargetConfiguration {
datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset"
}
}
tags = mapOf<String, String>(
"tagKey1" to "tagValue1"
)
}
//sampleEnd
}
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetOptions
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetSourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetTargetConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategy
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategyType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeField
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeSortOrder
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowLoadType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowS3SourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSource
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSourceType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSqlTransformationConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTarget
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTargetType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformation
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformationType
fun main() {
//sampleStart
// Successful CreateDataIntegrationFlow for dataset to dataset flow
val resp = supplyChainClient.createDataIntegrationFlow {
instanceId = "8850c54e-e187-4fa7-89d4-6370f165174d"
name = "trading-partner"
sources = listOf<DataIntegrationFlowSource>(
DataIntegrationFlowSource {
sourceType = DataIntegrationFlowSourceType.fromValue("DATASET")
sourceName = "testSourceName1"
datasetSource = DataIntegrationFlowDatasetSourceConfiguration {
datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset1"
}
},
DataIntegrationFlowSource {
sourceType = DataIntegrationFlowSourceType.fromValue("DATASET")
sourceName = "testSourceName2"
datasetSource = DataIntegrationFlowDatasetSourceConfiguration {
datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset2"
}
}
)
transformation = DataIntegrationFlowTransformation {
transformationType = DataIntegrationFlowTransformationType.fromValue("SQL")
sqlTransformation = DataIntegrationFlowSqlTransformationConfiguration {
query = "SELECT S1.id AS id, S1.poc_org_unit_description AS description, S1.company_id AS company_id, S1.tpartner_type AS tpartner_type, S1.geo_id AS geo_id, S1.eff_start_date AS eff_start_date, S1.eff_end_date AS eff_end_date FROM testSourceName1 AS S1 LEFT JOIN testSourceName2 as S2 ON S1.id=S2.id"
}
}
target = DataIntegrationFlowTarget {
targetType = DataIntegrationFlowTargetType.fromValue("DATASET")
datasetTarget = DataIntegrationFlowDatasetTargetConfiguration {
datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/asc/datasets/trading_partner"
options = DataIntegrationFlowDatasetOptions {
loadType = DataIntegrationFlowLoadType.fromValue("REPLACE")
dedupeRecords = true
dedupeStrategy = DataIntegrationFlowDedupeStrategy {
type = DataIntegrationFlowDedupeStrategyType.fromValue("FIELD_PRIORITY")
fieldPriority = DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration {
fields = listOf<DataIntegrationFlowFieldPriorityDedupeField>(
DataIntegrationFlowFieldPriorityDedupeField {
name = "eff_start_date"
sortOrder = DataIntegrationFlowFieldPriorityDedupeSortOrder.fromValue("DESC")
}
)
}
}
}
}
}
tags = mapOf<String, String>(
"tagKey1" to "tagValue1"
)
}
//sampleEnd
}