From da07a2ca3a8c5ee97eac179b28dc3d4a064fd026 Mon Sep 17 00:00:00 2001 From: tilpner Date: Thu, 28 May 2020 16:42:03 +0200 Subject: Initial commit --- src/atom.rs | 95 ++++++++++++++++++++++++++++++++++++ src/main.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/query/issues.rs | 118 ++++++++++++++++++++++++++++++++++++++++++++ src/query/labels.rs | 65 +++++++++++++++++++++++++ src/query/mod.rs | 87 +++++++++++++++++++++++++++++++++ 5 files changed, 503 insertions(+) create mode 100644 src/atom.rs create mode 100644 src/main.rs create mode 100644 src/query/issues.rs create mode 100644 src/query/labels.rs create mode 100644 src/query/mod.rs (limited to 'src') diff --git a/src/atom.rs b/src/atom.rs new file mode 100644 index 0000000..f431f42 --- /dev/null +++ b/src/atom.rs @@ -0,0 +1,95 @@ +use std::{ + path::PathBuf, + fs::{ self, File } +}; + +use sqlx::prelude::*; +use atom_syndication::*; +use anyhow::Result; +use futures::{ Stream, StreamExt }; + +use tracing::info; + +use crate::{ Conn, query::repo_id }; + +#[allow(dead_code)] +#[derive(sqlx::FromRow)] +struct Issue { + number: i64, + state: i64, + title: String, + body: String, + user_login: String, + html_url: String, + updated_at: i64 +} + +async fn query_issues_for_label<'conn>(conn: &'conn mut Conn, repo_id: i64, label: &str) -> impl Stream> + 'conn { + sqlx::query_as::<_, Issue>(r#" + SELECT issues.number, state, title, body, user_login, html_url, updated_at FROM issues + INNER JOIN is_labeled ON is_labeled.issue=issues.number + WHERE is_labeled.label=(SELECT id FROM labels WHERE repo=? AND name=?) + ORDER BY issues.number DESC + "#).bind(repo_id).bind(label) + .fetch(conn) +} + +fn issue_to_entry(issue: Issue) -> Entry { + EntryBuilder::default() + .title(issue.title) + .id(issue.html_url.clone()) + .links(vec![LinkBuilder::default() + .href(issue.html_url) + .build() + .expect("Failed to build link")]) + .content(ContentBuilder::default() + .content_type(Some(String::from("html"))) + .value(issue.body) + .build() + .expect("Failed to build content")) + .build() + .expect("Failed to build entry") +} + +pub async fn generate(mut conn: &mut Conn, (ref owner, ref name): (String, String), out_path: PathBuf, labels: Vec) -> Result<()> { + let labels = if labels.is_empty() { + sqlx::query_as::<_, (String,)>( + "SELECT name FROM labels WHERE repo=(SELECT id FROM repositories WHERE owner=? AND name=?)" + ).bind(owner).bind(name) + .fetch(&mut *conn) + .filter_map(|row| async { match row { + Ok((label,)) => Some(label), + _ => None + } }) + .collect() + .await + } else { labels }; + + let repo_id = repo_id(&mut conn, owner, name).await?; + + for label in labels { + info!("atom for {:?}", label); + + let mut feed = FeedBuilder::default(); + feed.title(label.clone()); + + let issues = query_issues_for_label(&mut conn, repo_id, &label).await; + feed.entries( + issues.filter_map(|issue| async { issue.ok() }) + .map(issue_to_entry) + .collect::>() + .await + ); + + let feed = feed.build().expect("Failed to build feed"); + + let feed_directory = out_path.join(label); + fs::create_dir_all(&feed_directory)?; + + let feed_path = feed_directory.join("atom.xml"); + let mut out_file = File::create(feed_path)?; + feed.write_to(&mut out_file)?; + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..196e587 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,138 @@ +use std::{ env, io, path::PathBuf }; +use structopt::StructOpt; +use sqlx::SqlitePool; +use tracing::info; +use tracing_subscriber::{ + fmt, filter, + layer::SubscriberExt, + util::SubscriberInitExt +}; + +use anyhow::{ anyhow, Result, Context }; + +pub mod query; +pub mod atom; + +#[derive(StructOpt)] +#[structopt(name = "github-label-feed")] +struct Opt { + #[structopt(subcommand)] + mode: OptMode, +} + +#[derive(StructOpt)] +enum OptMode { + List, + Sync { + repo: String, + #[structopt(long = "github-api-token", env = "GITHUB_TOKEN", hide_env_values = true)] + github_api_token: String + }, + Atom { + repo: String, + out_path: PathBuf, + labels: Vec + } +} + + +pub type Conn = sqlx::SqliteConnection; + +async fn init_db(conn: &mut Conn) { + // Naive init, all data is re-fetch-able, so no support for migrations + sqlx::query(r#" + PRAGMA foreign_keys = ON; + PRAGMA synchronous = OFF; + + CREATE TABLE IF NOT EXISTS repositories( + id integer PRIMARY KEY, + owner text, name text, + UNIQUE (owner, name) + ); + + CREATE TABLE IF NOT EXISTS issues( + repo integer REFERENCES repositories, + number integer, + state integer, title text, body text, + user_login text, + html_url text, + updated_at integer, + PRIMARY KEY (repo, number) + ); + + CREATE TABLE IF NOT EXISTS labels( + id integer PRIMARY KEY, + repo integer REFERENCES repositories, + name text, + UNIQUE (repo, name) + ); + + CREATE TABLE IF NOT EXISTS is_labeled( + repo integer, issue integer, + label integer RFERENCES labels, + PRIMARY KEY (issue, label), + FOREIGN KEY (repo, issue) REFERENCES issues + ); + "#).execute(conn) + .await + .expect("Failed to init database"); +} + +pub fn parse_repo(combined: &str) -> Result<(String, String)> { + let mut parts = combined + .split('/') + .map(str::trim) + .map(str::to_owned); + + match (parts.next(), parts.next()) { + (Some(r), Some(n)) => Ok((r, n)), + _ => Err(anyhow!("invalid repo format, expected owner/name: '{}'", combined)) + } +} + +fn main() -> Result<()> { + let env_spec = env::var("RUST_LOG") + .unwrap_or_else(|_| String::from("info")); + tracing_subscriber::registry() + .with(fmt::layer() + .without_time() + .with_writer(io::stderr)) + .with(filter::EnvFilter::new(env_spec)) + .init(); + + let opt = Opt::from_args(); + + smol::run(async { + let pool = SqlitePool::new("sqlite:./issues.sqlite").await?; + init_db(&mut *pool.acquire().await?).await; + + match opt.mode { + OptMode::List => { + let repos = query::list_repositories(&mut *pool.acquire().await?).await?; + for query::RepositoryInfo { owner, name, label_count, issue_count, .. } in repos { + println!("{}/{} ({} labels, {} issues)", owner, name, label_count, issue_count); + } + Ok(()) + }, + OptMode::Sync { repo, github_api_token } => { + info!("sync"); + let repo = parse_repo(&repo)?; + let mut tx = pool.begin().await?; + query::labels::update(&mut tx, &github_api_token, repo.clone()) + .await + .context("Failed to update labels")?; + query::issues::update(&mut tx, &github_api_token, repo) + .await + .context("Failed to update issues")?; + tx.commit().await?; + Ok(()) + }, + OptMode::Atom { repo, out_path, labels } => { + let repo = parse_repo(&repo)?; + atom::generate(&mut *pool.acquire().await?, repo, out_path, labels).await + .context("Failed to generate Atom feed")?; + Ok(()) + } + } + }) +} diff --git a/src/query/issues.rs b/src/query/issues.rs new file mode 100644 index 0000000..c829a22 --- /dev/null +++ b/src/query/issues.rs @@ -0,0 +1,118 @@ +#![allow(proc_macro_derive_resolution_fallback)] + +use graphql_client::{ GraphQLQuery, Response }; +use reqwest::Client; + +use chrono::{ Utc, TimeZone }; +use tracing::{ error, info, debug }; + +use crate::{ Conn, query::* }; + +type URI = String; +type HTML = String; +type DateTime = String; + +#[derive(GraphQLQuery)] +#[graphql( + // curl https://api.github.com/graphql -H 'Authorization: bearer ...' + schema_path = "graphql/github.json", + query_path = "graphql/issues.graphql", + response_derives = "Debug" +)] +pub struct IssuesQuery; + +fn state_to_integer(state: issues_query::IssueState) -> i64 { + use issues_query::IssueState::*; + match state { + OPEN => 0, + CLOSED => 1, + Other(_) => 2 + } +} + +pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> { + let repo = repo_id(conn, owner, name).await?; + + let last_updated = last_updated(conn, repo) + .await? + .map(|t| Utc.timestamp(t, 0).to_rfc3339()); + info!("updating repo {}/{} ({}), last update from {:?}", owner, name, repo, last_updated); + + let client = Client::new(); + + let mut has_next_page = true; + let mut last_cursor = None; + while has_next_page { + eprint!("."); + let query = IssuesQuery::build_query(issues_query::Variables { + owner: owner.to_owned(), + name: name.to_owned(), + since: last_updated.clone(), + after: last_cursor.clone() + }); + + let res = graphql::query(&client, github_api_token, query).await?; + let response: Response = res.json().await?; + + for error in response.errors.unwrap_or_default() { + error!("{:?}", error); + } + + let repository = response.data + .expect("Missing response data") + .repository + .expect("Missing repository"); + + has_next_page = repository.issues.page_info.has_next_page; + debug!("has_next_page: {}", has_next_page); + let issues = repository.issues.edges.unwrap_or_default(); + + for issue in issues.into_iter().flatten() { + last_cursor = Some(issue.cursor); + if let Some(issue) = issue.node { + debug!("#{}: {}", issue.number, issue.title); + let ts = chrono::DateTime::parse_from_rfc3339(&issue.updated_at) + .expect("failed to parse datetime") + .timestamp(); + let author = issue.author + .map(|author| author.login) + .unwrap_or_else(|| String::from("ghost")); + + sqlx::query( + "REPLACE INTO issues (repo, number, state, title, body, user_login, html_url, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + ).bind(repo).bind(issue.number) + .bind(state_to_integer(issue.state)).bind(issue.title).bind(issue.body_html) + .bind(author).bind(issue.url).bind(ts) + .execute(&mut conn) + .await?; + + sqlx::query( + "DELETE FROM is_labeled WHERE repo=? AND issue=?" + ).bind(repo).bind(issue.number) + .execute(&mut conn) + .await?; + + let labels = issue.labels + .map(|l| l.edges) + .unwrap_or_default() + .unwrap_or_default() + .into_iter() + .flatten() + .map(|l| l.node) + .flatten(); + + for label in labels { + debug!("label: {}", label.name); + sqlx::query( + "INSERT INTO is_labeled (repo, issue, label) VALUES (?, ?, (SELECT id FROM labels WHERE name=?))" + ).bind(repo).bind(issue.number).bind(label.name) + .execute(&mut conn) + .await?; + } + } + } + } + + Ok(()) +} diff --git a/src/query/labels.rs b/src/query/labels.rs new file mode 100644 index 0000000..09774c6 --- /dev/null +++ b/src/query/labels.rs @@ -0,0 +1,65 @@ +use graphql_client::{ GraphQLQuery, Response }; +use reqwest::Client; + +use tracing::{ error, debug }; + +use crate::{ Conn, query::* }; + +type URI = String; + +#[derive(GraphQLQuery)] +#[graphql( + // curl https://api.github.com/graphql -H 'Authorization: bearer ...' + schema_path = "graphql/github.json", + query_path = "graphql/labels.graphql", + response_derives = "Debug" +)] +pub struct RepoLabels; + +pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> { + let repo = repo_id(&mut conn, owner, name).await?; + + let client = Client::new(); + + let mut has_next_page = true; + let mut last_cursor = None; + while has_next_page { + let query = RepoLabels::build_query(repo_labels::Variables { + owner: owner.to_owned(), + name: name.to_owned(), + after: last_cursor.clone() + }); + + let res = graphql::query(&client, github_api_token, query).await?; + let response: Response = res.json().await?; + + for error in response.errors.unwrap_or_default() { + error!("{:?}", error); + } + + let repository = response.data + .expect("Missing response data") + .repository + .expect("Missing repository"); + + if repository.labels.is_none() { break } + let labels = repository.labels.unwrap(); + has_next_page = labels.page_info.has_next_page; + debug!("has_next_page: {}", has_next_page); + let labels = labels.edges.unwrap_or_default(); + + for label in labels.into_iter().flatten() { + last_cursor = Some(label.cursor); + if let Some(label) = label.node { + debug!("{}: {}", repo, label.name); + sqlx::query( + "INSERT OR IGNORE INTO labels (repo, name) VALUES (?, ?)" + ).bind(repo).bind(label.name) + .execute(&mut conn) + .await?; + } + } + } + + Ok(()) +} diff --git a/src/query/mod.rs b/src/query/mod.rs new file mode 100644 index 0000000..4e472c5 --- /dev/null +++ b/src/query/mod.rs @@ -0,0 +1,87 @@ +use sqlx::prelude::*; +use anyhow::{ Result, Context }; + +use crate::Conn; + +pub mod issues; +pub mod labels; + +#[derive(sqlx::FromRow, sqlx::Type)] +pub struct RepositoryInfo { + pub owner: String, + pub name: String, + pub label_count: i64, + pub issue_count: i64 +} + +pub async fn repo_id(conn: &mut Conn, owner: &str, name: &str) -> Result { + sqlx::query_as::<_, (i64,)>( + "INSERT OR IGNORE INTO repositories (owner, name) VALUES (?, ?); + SELECT id FROM repositories WHERE owner = ? AND name = ?" + ).bind(owner).bind(name) + .bind(owner).bind(name) + .fetch_one(conn) + .await + .map(|(id,)| id) + .with_context(|| format!("Couldn't find repo '{}/{}' in database", owner, name)) +} + +async fn last_updated(conn: &mut Conn, repo: i64) -> Result> { + sqlx::query_as::<_, (i64,)>( + "SELECT MAX(updated_at) FROM issues WHERE repo = ?", + ).bind(repo) + .fetch_optional(conn) + .await + .map(|opt| opt.map(|row| row.0)) + .with_context(|| format!("Couldn't find time of last update for repo id {}", repo)) +} + +pub async fn list_repositories(db: &mut Conn) -> sqlx::Result> { + sqlx::query_as( + "SELECT repositories.owner, repositories.name, + (SELECT count(id) FROM labels WHERE repo = repositories.id) AS label_count, + (SELECT count(number) FROM issues WHERE repo = repositories.id) AS issue_count + FROM repositories" + ).fetch_all(db) + .await +} + +pub mod graphql { + use std::time::Duration; + use reqwest::header; + use serde::Serialize; + use futures_retry::{ ErrorHandler, RetryPolicy, FutureRetry }; + use graphql_client::QueryBody; + + static API_ENDPOINT: &str = "https://api.github.com/graphql"; + static USER_AGENT: &str = "github.com/tilpner/github-label-feed"; + + static RETRY_DELAY: &[u64] = &[ 5, 50, 250, 1000, 5000, 25000 ]; + + pub struct RetryStrategy; + impl ErrorHandler for RetryStrategy { + type OutError = reqwest::Error; + + fn handle(&mut self, attempt: usize, e: reqwest::Error) -> RetryPolicy { + match RETRY_DELAY.get(attempt) { + Some(&ms) => RetryPolicy::WaitRetry(Duration::from_millis(ms)), + None => RetryPolicy::ForwardError(e) + } + } + } + + pub async fn query(client: &reqwest::Client, api_token: &str, query: QueryBody) -> reqwest::Result { + FutureRetry::new(|| { + client + .post(API_ENDPOINT) + .timeout(Duration::from_secs(60)) + .header(header::USER_AGENT, USER_AGENT) + .bearer_auth(api_token) + .json(&query) + .send() + }, RetryStrategy) + .await + .map(|(res, _)| res) + .map_err(|(e, _)| e) + } +} -- cgit v1.2.3