Load data from Stripe to BigQuery
Here’s the easiest way to synchronize the data from Stripe to BigQuery, under the condition you have enabled Stripe Sigma (paid but worth it!).
1. Create Cloud Function that handles Sigma webhooks
Start by creating new Cloud Function.
Runtime variables
Make sure to add STRIPE_SECRET runtime variable:
index.js
const { Storage } = require("@google-cloud/storage");
const { BigQuery } = require("@google-cloud/bigquery");
const fetch = require("node-fetch");const storage = new Storage();
const bigquery = new BigQuery();
const bucket = "your-cloud-storage-bucket";async function uploadFile(fetchPromise, bucket, filename) {
let result = await fetchPromise;
return new Promise((resolve, reject) => {
const readStream = result.body;
const writeStream = storage
.bucket(bucket)
.file(filename)
.createWriteStream();readStream.on("error", reject);
writeStream.on("error", reject);
writeStream.on("finish", () => resolve(true));readStream.pipe(writeStream);
});
}exports.upload = async (req, res) => {
if (req.body.type === "sigma.scheduled_query_run.created") {
const [bq, _project, dataset, table] =
req.body.data.object.title.split(":");
if (bq !== "bigquery") {
return res.status(200).send(`Ignoring ${req.body.data.object.title}. No bigquery: prefix`);
}const url = req.body.data.object.file.url;
const fetchPromise = fetch(url, {
headers: {
Authorization:
"Basic " + Buffer.from(process.env.STRIPE_SECRET).toString("base64"),
},
});
const filename =
req.body.data.object.id + "/" + req.body.data.object.file.id;
await uploadFile(fetchPromise, bucket, filename);
const metadata = {
sourceFormat: "CSV",
skipLeadingRows: 1,
autodetect: true,
location: "EU",
};
try {
const [exists] = await bigquery.dataset(dataset).table(table).exists();
if (exists) {
await bigquery.dataset(dataset).table(table).delete();
}
await bigquery
.dataset(dataset)
.table(table)
.load(storage.bucket(bucket).file(filename), metadata);
} catch (e) {
return res.status(500).send(JSON.stringify(e));
}return res.status(200).send(req.body.data.object.file.id);
}
return res.status(500).send("Failed");
};
package.json
{
"name": "stripe-sigma",
"version": "0.0.1",
"dependencies": {
"@google-cloud/storage": "^5.11.0",
"@google-cloud/bigquery": "^5.7.0",
"node-fetch": "^2.6.1"
}
}
Deploy this function and copy the trigger URL:
2. Add Webhook endpoint
Go to Webhooks and create event, providing Endpoint URL from previous step and selecting sigma_scheduled_query_run.created
event:
3. Create Sigma Scheduled query
Go to Sigma dashboard (https://dashboard.stripe.com/sigma/queries)
Create a new query with any SQL you like (here I’m just loading all the charges data) and assign a name according to the convention: bigquery:project_id:dataset_id:table_name
Schedule this query using “Schedule” button, you can leave out emails so that you don’t get notified in any other way than Webhook.
From now on, the results from this query will be saved under stripe_charges
table in segmentation
dataset.