Flow Service is used to collect and centralize customer data from different sources within Adobe Experience Platform. The service provides a user interface and RESTful API from which all supported sources are connectable.
This tutorial uses the Flow Service API to walk you through the steps to create a streaming connection using the Flow Service API.
Getting started
This guide requires a working understanding of the following components of Adobe Experience Platform:
- Experience Data Model (XDM): The standardized framework by which Platform organizes experience data.
- Real-Time Customer Profile: Provides a unified, consumer profile in real time based on aggregated data from multiple sources.
Additionally, creating a streaming connection requires you to have a target XDM schema and a dataset. To learn how to create these, please read the tutorial on streaming record data or the tutorial on streaming time-series data.
Using Platform APIs
For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.
Create a base connection
A base connection specifies the source and contains the information required to make the flow compatible with streaming ingestion APIs. When creating a base connection, you have the option of creating a non-authenticated and an authenticated connection.
Non-authenticated connection
Non-authenticated connections are the standard streaming connection you can create when you want to stream data into Platform.
To create a non-authenticated base connection, make a POST request to the /connections
endpoint while providing a name for your connection, the data type, and the HTTP API connection specification ID. This ID is bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb
.
API format
POST /flowservice/connections
Request
The following request creates a base connection for HTTP API.
curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Connection XDM Data", "description": "ACME streaming connection for customer data", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" }, "auth": { "specName": "Streaming Connection", "params": { "dataType": "xdm" } } }'
curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Connection Raw Data", "description": "ACME streaming connection for customer data", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" }, "auth": { "specName": "Streaming Connection", "params": { "dataType": "raw" } } }'
Property | Description |
---|---|
name |
The name of your base connection. Ensure that the name is descriptive as you can use this to look up information on your base connection. |
description |
(Optional) A property that you can include to provide more information on your base connection. |
connectionSpec.id |
The connection specification ID that corresponds with HTTP API. This ID is bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb . |
auth.params.dataType |
The data type for the streaming connection. Supported values include: xdm and raw . |
auth.params.name |
The name of the streaming connection you want to create. |
Response
A successful response returns HTTP status 201 with details of the newly created connection, including its unique identifier (id
).
(Video) Power Automate HTTP action, Rest API, and IOT
{ "id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9", "etag": "\"f50185ed-0000-0200-0000-637e8fad0000\""}
Property | Description |
---|---|
id |
The id of your newly created base connection. |
etag |
An identifier assigned to the connection, specifying the version of the base connection. |
Authenticated connection
Authenticated connections should be used when you need to differentiate between records coming from trusted and un-trusted sources. Users who want to send information with Personally Identifiable Information (PII) should create an authenticated connection when streaming information to Platform.
To create an authenticated base connection, you must include the authenticationRequired
parameter in your request and specify its value as true
. During this step, you can also provide a source ID for your authenticated base connection. This parameter is optional and will use the same value as the name
attribute, if it is not provided.
API format
POST /flowservice/connections
Request
The following request creates an authenticated base connection for HTTP API.
curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Connection XDM Data Authenticated", "description": "ACME streaming connection for customer data", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" }, "auth": { "specName": "Streaming Connection", "params": { "sourceId": "Authenticated XDM streaming connection", "dataType": "xdm", "name": "Authenticated XDM streaming connection", "authenticationRequired": true } } }
curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Connection Raw Data Authenticated", "description": "ACME streaming connection for customer data", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" }, "auth": { "specName": "Streaming Connection", "params": { "sourceId": "Authenticated raw streaming connection", "dataType": "raw", "name": "Authenticated raw streaming connection", "authenticationRequired": true } } }
Property | Description |
---|---|
auth.params.sourceId |
An additional identifier that can be used when creating an authenticated base connection. This parameter is optional and will use the same value as the name attribute, if it is not provided. |
auth.params.authenticationRequired |
This parameter specifies whether the streaming connection requires authentication or not. If authenticationRequired is set to true then authentication must be provided for the streaming connection. If authenticationRequired is set to false then authentication is not required. |
Response
A successful response returns HTTP status 201 with details of the newly created connection, including its unique identifier (id
).
{ "id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9", "etag": "\"f50185ed-0000-0200-0000-637e8fad0000\""}
Get streaming endpoint URL
With the base connection created, you can now retrieve your streaming endpoint URL.
API format
GET /flowservice/connections/{BASE_CONNECTION_ID}
Parameter | Description |
---|---|
{BASE_CONNECTION_ID} |
The id value of the connection you previously created. |
Request
curl -X GET https://platform.adobe.io/data/foundation/flowservice/connections/{BASE_CONNECTION_ID} \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-sandbox-name: {SANDBOX_NAME}'
Response
A successful response returns HTTP status 200 with detailed information about the requested connection. The streaming endpoint URL is automatically created with the connection, and can be retrieved using the inletUrl
value.
{ "items": [ { "id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9", "createdAt": 1669238699119, "updatedAt": 1669238699119, "createdBy": "acme@AdobeID", "updatedBy": "acme@AdobeID", "createdClient": "{CREATED_CLIENT}", "updatedClient": "{UPDATED_CLIENT}", "sandboxId": "{SANDBOX_ID}", "sandboxName": "{SANDBOX_NAME}", "imsOrgId": "{ORG_ID}", "name": "ACME Streaming Connection XDM Data", "description": "ACME streaming connection for customer data", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" }, "state": "enabled", "auth": { "specName": "Streaming Connection", "params": { "sourceId": "ACME Streaming Connection XDM Data", "inletUrl": "https://dcs.adobedc.net/collection/667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec", "authenticationRequired": false, "inletId": "667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec", "dataType": "xdm", "name": "ACME Streaming Connection XDM Data" } }, "version": "\"f50185ed-0000-0200-0000-637e8fad0000\"", "etag": "\"f50185ed-0000-0200-0000-637e8fad0000\"" } ]}
Create a source connection
To create a source connection, make a POST request to the /sourceConnections
endpoint while providing your base connection ID.
(Video) Create a Power BI streaming dataset for real-time dashboards
API format
POST /flowservice/sourceConnections
Request
curl -X POST \ 'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-api-key: {API_KEY}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Source Connection for Customer Data", "description": "A streaming source connection for ACME XDM Customer Data", "baseConnectionId": "a59d368a-1152-4673-a46e-bd52e8cdb9a9", "connectionSpec": { "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb", "version": "1.0" } }'
Response
A successful response returns HTTP status 201 with detailed of the newly created source connection, including its unique identifier (id
).
{ "id": "34ece231-294d-416c-ad2a-5a5dfb2bc69f", "etag": "\"d505125b-0000-0200-0000-637eb7790000\""}
Create a target XDM schema
In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained.
A target XDM schema can be created by performing a POST request to the Schema Registry API.
For detailed steps on how to create a target XDM schema, see the tutorial on creating a schema using the API.
Create a target dataset
A target dataset can be created by performing a POST request to the Catalog Service API, providing the ID of the target schema within the payload.
Batch Ingestion API Guide | Adobe Experience Platform
For detailed steps on how to create a target dataset, see the tutorial on creating a dataset using the API.
Create a target connection
A target connection represents the connection to the destination where the ingested data lands in. To create a target connection, make POST request to /targetConnections
while providing IDs for your target dataset and target XDM schema. During this step, you must also provide the data lake connection specification ID. This ID is c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
API format
POST /flowservice/targetConnections
Request
curl -X POST \ 'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-api-key: {API_KEY}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Target Connection", "description": "ACME Streaming Target Connection", "data": { "schema": { "id": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1", "version": "application/vnd.adobe.xed-full+json;version=1.0" } }, "params": { "dataSetId": "637eb7fadc8a211b6312b65b" }, "connectionSpec": { "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c", "version": "1.0" } }'
Response
A successful response returns HTTP status 201 with details of the newly created target connection, including its unique identifier (id
).
{ "id": "07f2f6ff-1da5-4704-916a-c615b873cba9", "etag": "\"340680f7-0000-0200-0000-637eb8730000\""}
Create a mapping
In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.
To create a mapping set, make a POST request to the mappingSets
endpoint of the Data Prep API while providing your target XDM schema $id
and the details of the mapping sets you want to create.
(Video) Microsoft Power Automate – How to do REST API Calls and update Excel – Full Tutorial
API format
POST /mappingSets
Request
curl -X POST \ 'https://platform.adobe.io/data/foundation/mappingSets' \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'x-api-key: {API_KEY}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -H 'Content-Type: application/json' \ -d '{ "version": 0, "xdmSchema": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1", "xdmVersion": "1.0", "mappings": [ { "destinationXdmPath": "person.name.firstName", "sourceAttribute": "firstName", "identity": false, "version": 0 }, { "destinationXdmPath": "person.name.lastName", "sourceAttribute": "lastName", "identity": false, "version": 0 } ] }'
Property | Description |
---|---|
xdmSchema |
The $id of the target XDM schema. |
Response
A successful response returns details of the newly created mapping including its unique identifier (id
). This ID is required in a later step to create a dataflow.
{ "id": "79a623960d3f4969835c9e00dc90c8df", "version": 0, "createdDate": 1669249214031, "modifiedDate": 1669249214031, "createdBy": "acme@AdobeID", "modifiedBy": "acme@AdobeID"}
Create a dataflow
With your source and target connections created, you can now create a dataflow. The dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request to the /flows
endpoint.
API format
POST /flows
Request
The following request creates a streaming dataflow for HTTP API without data transformations.
curl -X POST \ 'https://platform.adobe.io/data/foundation/flowservice/flows' \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-api-key: {API_KEY}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "ACME Streaming Dataflow", "description": "ACME streaming dataflow for customer data", "flowSpec": { "id": "d8a6f005-7eaf-4153-983e-e8574508b877", "version": "1.0" }, "sourceConnectionIds": [ "34ece231-294d-416c-ad2a-5a5dfb2bc69f" ], "targetConnectionIds": [ "07f2f6ff-1da5-4704-916a-c615b873cba9" ] }'
The following requests creates a streaming dataflow for HTTP API with mapping transformations applied to your data.
When creating a dataflow with transformations, the name
parameter cannot be changed. This value must always be set to Mapping
.
curl -X POST \ 'https://platform.adobe.io/data/foundation/flowservice/flows' \ -H 'Authorization: Bearer {ACCESS_TOKEN}' \ -H 'Content-Type: application/json' \ -H 'x-api-key: {API_KEY}' \ -H 'x-gw-ims-org-id: {ORG_ID}' \ -H 'x-sandbox-name: {SANDBOX_NAME}' \ -d '{ "name": "", "description": "", "flowSpec": { "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81", "version": "1.0" }, "sourceConnectionIds": [ "34ece231-294d-416c-ad2a-5a5dfb2bc69f" ], "targetConnectionIds": [ "07f2f6ff-1da5-4704-916a-c615b873cba9" ], "transformations": [ { "name": "Mapping", "params": { "mappingId": "79a623960d3f4969835c9e00dc90c8df", "mappingVersion": 0 } } ] }'
Property | Description |
---|---|
name |
The name of your dataflow. Ensure that the name of your dataflow is descriptive as you can use this to look up information on your dataflow. |
description |
(Optional) A property that you can include to provide more information on your dataflow. |
flowSpec.id |
The flow specification ID for HTTP API. To create a dataflow with transformations, you must use c1a19761-d2c7-4702-b9fa-fe91f0613e81 . To create a dataflow without transformations, use d8a6f005-7eaf-4153-983e-e8574508b877 . |
sourceConnectionIds |
The source connection ID retrieved in an earlier step. |
targetConnectionIds |
The target connection ID retrieved in an earlier step. |
transformations.params.mappingId |
The mapping ID retrieved in an earlier step. |
Response
A successful response returns HTTP status 201 with details of your newly created dataflow, including its unique identifier (id
).
{ "id": "f2ae0194-8bd8-4a40-a4d9-f07bdc3e6ce2", "etag": "\"dc0459ae-0000-0200-0000-637ebaec0000\""}
Post data to be ingested to Platform
Now that you’ve created your flow, you can send your JSON message to the streaming endpoint you previously created.
(Video) Working with HTTP Request and Response in Power Automate
API format
POST /collection/{INLET_URL}
Parameter | Description |
---|---|
{INLET_URL} |
Your streaming endpoint URL. You can retrieve this URL by making a GET request to the /connections endpoint while providing your base connection ID. |
Request
curl -X POST https://dcs.adobedc.net/collection/667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec \ -H 'Content-Type: application/json' \ -H 'x-adobe-flow-id: f2ae0194-8bd8-4a40-a4d9-f07bdc3e6ce2' \ -d '{ "header": { "schemaRef": { "id": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1", "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0" }, "flowId": "f2ae0194-8bd8-4a40-a4d9-f07bdc3e6ce2", "datasetId": "604a18a3bae67d18db6d258c" }, "body": { "xdmMeta": { "schemaRef": { "id": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1", "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0" } }, "xdmEntity": { "_id": "http-source-connector-acme-01", "person": { "name": { "firstName": "suman", "lastName": "nolan" } }, "workEmail": { "primary": true, "address": "suman@acme.com", "type": "work", "status": "active" } } } }'
curl -X POST https://dcs.adobedc.net/collection/667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec \ -H 'Content-Type: application/json' \ -H 'x-adobe-flow-id: 1f086c23-2ea8-4d06-886c-232ea8bd061d' \ -d '{ "name": "Johnson Smith", "location": { "city": "Seattle", "country": "United State of America", "address": "3692 Main Street" }, "gender": "Male", "birthday": { "year": 1984, "month": 6, "day": 9 } }'
Response
A successful response returns HTTP status 200 with details of the newly ingested information.
{ "inletId": "{BASE_CONNECTION_ID}", "xactionId": "1584479347507:2153:240", "receivedTimeMs": 1584479347507}
Property | Description |
---|---|
{BASE_CONNECTION_ID} |
The ID of the previously created streaming connection. |
xactionId |
A unique identifier generated server-side for the record you just sent. This ID helps Adobe trace this record’s lifecycle through various systems and with debugging. |
receivedTimeMs |
A timestamp (epoch in milliseconds) that shows what time the request was received. |
Next steps
By following this tutorial, you have created a streaming HTTP connection, enabling you to use the streaming endpoint to ingest data into Platform. For instructions to create a streaming connection in the UI, please read the creating a streaming connection tutorial.
To learn how to stream data to Platform, please read either the tutorial on streaming time series data or the tutorial on streaming record data.
Appendix
This section provides supplemental information on creating streaming connections using the API.
Sending messages to an authenticated streaming connection
If a streaming connection has authentication enabled, the client will be required to add the Authorization
header to their request.
If the Authorization
header is not present, or an invalid/expired access token is sent, an HTTP 401 Unauthorized response will be returned, with a similar response as below:
Response
{ "type": "https://ns.adobe.com/adobecloud/problem/data-collection-service-authorization", "status": "401", "title": "Authorization", "report": { "message": "[id] Ims service token is empty" }}
(Video) Streaming requests with fetch – HTTP 203