createDataIntegrationFlow

Enables you to programmatically create a data pipeline to ingest data from source systems such as Amazon S3 buckets, to a predefined Amazon Web Services Supply Chain dataset (product, inbound_order) or a temporary dataset along with the data transformation query provided with the API.

Samples

import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetOptions
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetSourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetTargetConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategy
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategyType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeField
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeSortOrder
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowLoadType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowS3SourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSource
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSourceType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSqlTransformationConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTarget
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTargetType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformation
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformationType

fun main() { 
   //sampleStart 
   // Successful CreateDataIntegrationFlow for s3 to dataset flow
val resp = supplyChainClient.createDataIntegrationFlow {
    instanceId = "8850c54e-e187-4fa7-89d4-6370f165174d"
    name = "testStagingFlow"
    sources = listOf<DataIntegrationFlowSource>(
        DataIntegrationFlowSource {
            sourceType = DataIntegrationFlowSourceType.fromValue("S3")
            sourceName = "testSourceName"
            s3Source = DataIntegrationFlowS3SourceConfiguration {
                bucketName = "aws-supply-chain-data-b8c7bb28-a576-4334-b481-6d6e8e47371f"
                prefix = "example-prefix"
            }
        }            
    )
    transformation = DataIntegrationFlowTransformation {
        transformationType = DataIntegrationFlowTransformationType.fromValue("SQL")
        sqlTransformation = DataIntegrationFlowSqlTransformationConfiguration {
            query = "SELECT * FROM testSourceName"
        }
    }
    target = DataIntegrationFlowTarget {
        targetType = DataIntegrationFlowTargetType.fromValue("DATASET")
        datasetTarget = DataIntegrationFlowDatasetTargetConfiguration {
            datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset"
        }
    }
    tags = mapOf<String, String>(
        "tagKey1" to "tagValue1"
    )
} 
   //sampleEnd
}
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetOptions
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetSourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDatasetTargetConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategy
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowDedupeStrategyType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeField
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeSortOrder
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowLoadType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowS3SourceConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSource
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSourceType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowSqlTransformationConfiguration
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTarget
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTargetType
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformation
import aws.sdk.kotlin.services.supplychain.model.DataIntegrationFlowTransformationType

fun main() { 
   //sampleStart 
   // Successful CreateDataIntegrationFlow for dataset to dataset flow
val resp = supplyChainClient.createDataIntegrationFlow {
    instanceId = "8850c54e-e187-4fa7-89d4-6370f165174d"
    name = "trading-partner"
    sources = listOf<DataIntegrationFlowSource>(
        DataIntegrationFlowSource {
            sourceType = DataIntegrationFlowSourceType.fromValue("DATASET")
            sourceName = "testSourceName1"
            datasetSource = DataIntegrationFlowDatasetSourceConfiguration {
                datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset1"
            }
        },
        DataIntegrationFlowSource {
            sourceType = DataIntegrationFlowSourceType.fromValue("DATASET")
            sourceName = "testSourceName2"
            datasetSource = DataIntegrationFlowDatasetSourceConfiguration {
                datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/default/datasets/my_staging_dataset2"
            }
        }            
    )
    transformation = DataIntegrationFlowTransformation {
        transformationType = DataIntegrationFlowTransformationType.fromValue("SQL")
        sqlTransformation = DataIntegrationFlowSqlTransformationConfiguration {
            query = "SELECT S1.id AS id, S1.poc_org_unit_description AS description, S1.company_id AS company_id, S1.tpartner_type AS tpartner_type, S1.geo_id AS geo_id, S1.eff_start_date AS eff_start_date, S1.eff_end_date AS eff_end_date FROM testSourceName1 AS S1 LEFT JOIN testSourceName2 as S2 ON S1.id=S2.id"
        }
    }
    target = DataIntegrationFlowTarget {
        targetType = DataIntegrationFlowTargetType.fromValue("DATASET")
        datasetTarget = DataIntegrationFlowDatasetTargetConfiguration {
            datasetIdentifier = "arn:aws:scn:us-east-1:123456789012:instance/8850c54e-e187-4fa7-89d4-6370f165174d/namespaces/asc/datasets/trading_partner"
            options = DataIntegrationFlowDatasetOptions {
                loadType = DataIntegrationFlowLoadType.fromValue("REPLACE")
                dedupeRecords = true
                dedupeStrategy = DataIntegrationFlowDedupeStrategy {
                    type = DataIntegrationFlowDedupeStrategyType.fromValue("FIELD_PRIORITY")
                    fieldPriority = DataIntegrationFlowFieldPriorityDedupeStrategyConfiguration {
                        fields = listOf<DataIntegrationFlowFieldPriorityDedupeField>(
                            DataIntegrationFlowFieldPriorityDedupeField {
                                name = "eff_start_date"
                                sortOrder = DataIntegrationFlowFieldPriorityDedupeSortOrder.fromValue("DESC")
                            }                                
                        )
                    }
                }
            }
        }
    }
    tags = mapOf<String, String>(
        "tagKey1" to "tagValue1"
    )
} 
   //sampleEnd
}