From da07a2ca3a8c5ee97eac179b28dc3d4a064fd026 Mon Sep 17 00:00:00 2001 From: tilpner Date: Thu, 28 May 2020 16:42:03 +0200 Subject: Initial commit --- src/query/issues.rs | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/query/labels.rs | 65 +++++++++++++++++++++++++++++ src/query/mod.rs | 87 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+) create mode 100644 src/query/issues.rs create mode 100644 src/query/labels.rs create mode 100644 src/query/mod.rs (limited to 'src/query') diff --git a/src/query/issues.rs b/src/query/issues.rs new file mode 100644 index 0000000..c829a22 --- /dev/null +++ b/src/query/issues.rs @@ -0,0 +1,118 @@ +#![allow(proc_macro_derive_resolution_fallback)] + +use graphql_client::{ GraphQLQuery, Response }; +use reqwest::Client; + +use chrono::{ Utc, TimeZone }; +use tracing::{ error, info, debug }; + +use crate::{ Conn, query::* }; + +type URI = String; +type HTML = String; +type DateTime = String; + +#[derive(GraphQLQuery)] +#[graphql( + // curl https://api.github.com/graphql -H 'Authorization: bearer ...' + schema_path = "graphql/github.json", + query_path = "graphql/issues.graphql", + response_derives = "Debug" +)] +pub struct IssuesQuery; + +fn state_to_integer(state: issues_query::IssueState) -> i64 { + use issues_query::IssueState::*; + match state { + OPEN => 0, + CLOSED => 1, + Other(_) => 2 + } +} + +pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> { + let repo = repo_id(conn, owner, name).await?; + + let last_updated = last_updated(conn, repo) + .await? + .map(|t| Utc.timestamp(t, 0).to_rfc3339()); + info!("updating repo {}/{} ({}), last update from {:?}", owner, name, repo, last_updated); + + let client = Client::new(); + + let mut has_next_page = true; + let mut last_cursor = None; + while has_next_page { + eprint!("."); + let query = IssuesQuery::build_query(issues_query::Variables { + owner: owner.to_owned(), + name: name.to_owned(), + since: last_updated.clone(), + after: last_cursor.clone() + }); + + let res = graphql::query(&client, github_api_token, query).await?; + let response: Response = res.json().await?; + + for error in response.errors.unwrap_or_default() { + error!("{:?}", error); + } + + let repository = response.data + .expect("Missing response data") + .repository + .expect("Missing repository"); + + has_next_page = repository.issues.page_info.has_next_page; + debug!("has_next_page: {}", has_next_page); + let issues = repository.issues.edges.unwrap_or_default(); + + for issue in issues.into_iter().flatten() { + last_cursor = Some(issue.cursor); + if let Some(issue) = issue.node { + debug!("#{}: {}", issue.number, issue.title); + let ts = chrono::DateTime::parse_from_rfc3339(&issue.updated_at) + .expect("failed to parse datetime") + .timestamp(); + let author = issue.author + .map(|author| author.login) + .unwrap_or_else(|| String::from("ghost")); + + sqlx::query( + "REPLACE INTO issues (repo, number, state, title, body, user_login, html_url, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + ).bind(repo).bind(issue.number) + .bind(state_to_integer(issue.state)).bind(issue.title).bind(issue.body_html) + .bind(author).bind(issue.url).bind(ts) + .execute(&mut conn) + .await?; + + sqlx::query( + "DELETE FROM is_labeled WHERE repo=? AND issue=?" + ).bind(repo).bind(issue.number) + .execute(&mut conn) + .await?; + + let labels = issue.labels + .map(|l| l.edges) + .unwrap_or_default() + .unwrap_or_default() + .into_iter() + .flatten() + .map(|l| l.node) + .flatten(); + + for label in labels { + debug!("label: {}", label.name); + sqlx::query( + "INSERT INTO is_labeled (repo, issue, label) VALUES (?, ?, (SELECT id FROM labels WHERE name=?))" + ).bind(repo).bind(issue.number).bind(label.name) + .execute(&mut conn) + .await?; + } + } + } + } + + Ok(()) +} diff --git a/src/query/labels.rs b/src/query/labels.rs new file mode 100644 index 0000000..09774c6 --- /dev/null +++ b/src/query/labels.rs @@ -0,0 +1,65 @@ +use graphql_client::{ GraphQLQuery, Response }; +use reqwest::Client; + +use tracing::{ error, debug }; + +use crate::{ Conn, query::* }; + +type URI = String; + +#[derive(GraphQLQuery)] +#[graphql( + // curl https://api.github.com/graphql -H 'Authorization: bearer ...' + schema_path = "graphql/github.json", + query_path = "graphql/labels.graphql", + response_derives = "Debug" +)] +pub struct RepoLabels; + +pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> { + let repo = repo_id(&mut conn, owner, name).await?; + + let client = Client::new(); + + let mut has_next_page = true; + let mut last_cursor = None; + while has_next_page { + let query = RepoLabels::build_query(repo_labels::Variables { + owner: owner.to_owned(), + name: name.to_owned(), + after: last_cursor.clone() + }); + + let res = graphql::query(&client, github_api_token, query).await?; + let response: Response = res.json().await?; + + for error in response.errors.unwrap_or_default() { + error!("{:?}", error); + } + + let repository = response.data + .expect("Missing response data") + .repository + .expect("Missing repository"); + + if repository.labels.is_none() { break } + let labels = repository.labels.unwrap(); + has_next_page = labels.page_info.has_next_page; + debug!("has_next_page: {}", has_next_page); + let labels = labels.edges.unwrap_or_default(); + + for label in labels.into_iter().flatten() { + last_cursor = Some(label.cursor); + if let Some(label) = label.node { + debug!("{}: {}", repo, label.name); + sqlx::query( + "INSERT OR IGNORE INTO labels (repo, name) VALUES (?, ?)" + ).bind(repo).bind(label.name) + .execute(&mut conn) + .await?; + } + } + } + + Ok(()) +} diff --git a/src/query/mod.rs b/src/query/mod.rs new file mode 100644 index 0000000..4e472c5 --- /dev/null +++ b/src/query/mod.rs @@ -0,0 +1,87 @@ +use sqlx::prelude::*; +use anyhow::{ Result, Context }; + +use crate::Conn; + +pub mod issues; +pub mod labels; + +#[derive(sqlx::FromRow, sqlx::Type)] +pub struct RepositoryInfo { + pub owner: String, + pub name: String, + pub label_count: i64, + pub issue_count: i64 +} + +pub async fn repo_id(conn: &mut Conn, owner: &str, name: &str) -> Result { + sqlx::query_as::<_, (i64,)>( + "INSERT OR IGNORE INTO repositories (owner, name) VALUES (?, ?); + SELECT id FROM repositories WHERE owner = ? AND name = ?" + ).bind(owner).bind(name) + .bind(owner).bind(name) + .fetch_one(conn) + .await + .map(|(id,)| id) + .with_context(|| format!("Couldn't find repo '{}/{}' in database", owner, name)) +} + +async fn last_updated(conn: &mut Conn, repo: i64) -> Result> { + sqlx::query_as::<_, (i64,)>( + "SELECT MAX(updated_at) FROM issues WHERE repo = ?", + ).bind(repo) + .fetch_optional(conn) + .await + .map(|opt| opt.map(|row| row.0)) + .with_context(|| format!("Couldn't find time of last update for repo id {}", repo)) +} + +pub async fn list_repositories(db: &mut Conn) -> sqlx::Result> { + sqlx::query_as( + "SELECT repositories.owner, repositories.name, + (SELECT count(id) FROM labels WHERE repo = repositories.id) AS label_count, + (SELECT count(number) FROM issues WHERE repo = repositories.id) AS issue_count + FROM repositories" + ).fetch_all(db) + .await +} + +pub mod graphql { + use std::time::Duration; + use reqwest::header; + use serde::Serialize; + use futures_retry::{ ErrorHandler, RetryPolicy, FutureRetry }; + use graphql_client::QueryBody; + + static API_ENDPOINT: &str = "https://api.github.com/graphql"; + static USER_AGENT: &str = "github.com/tilpner/github-label-feed"; + + static RETRY_DELAY: &[u64] = &[ 5, 50, 250, 1000, 5000, 25000 ]; + + pub struct RetryStrategy; + impl ErrorHandler for RetryStrategy { + type OutError = reqwest::Error; + + fn handle(&mut self, attempt: usize, e: reqwest::Error) -> RetryPolicy { + match RETRY_DELAY.get(attempt) { + Some(&ms) => RetryPolicy::WaitRetry(Duration::from_millis(ms)), + None => RetryPolicy::ForwardError(e) + } + } + } + + pub async fn query(client: &reqwest::Client, api_token: &str, query: QueryBody) -> reqwest::Result { + FutureRetry::new(|| { + client + .post(API_ENDPOINT) + .timeout(Duration::from_secs(60)) + .header(header::USER_AGENT, USER_AGENT) + .bearer_auth(api_token) + .json(&query) + .send() + }, RetryStrategy) + .await + .map(|(res, _)| res) + .map_err(|(e, _)| e) + } +} -- cgit v1.2.3