aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/atom.rs95
-rw-r--r--src/main.rs138
-rw-r--r--src/query/issues.rs118
-rw-r--r--src/query/labels.rs65
-rw-r--r--src/query/mod.rs87
5 files changed, 503 insertions, 0 deletions
diff --git a/src/atom.rs b/src/atom.rs
new file mode 100644
index 0000000..f431f42
--- /dev/null
+++ b/src/atom.rs
@@ -0,0 +1,95 @@
+use std::{
+ path::PathBuf,
+ fs::{ self, File }
+};
+
+use sqlx::prelude::*;
+use atom_syndication::*;
+use anyhow::Result;
+use futures::{ Stream, StreamExt };
+
+use tracing::info;
+
+use crate::{ Conn, query::repo_id };
+
+#[allow(dead_code)]
+#[derive(sqlx::FromRow)]
+struct Issue {
+ number: i64,
+ state: i64,
+ title: String,
+ body: String,
+ user_login: String,
+ html_url: String,
+ updated_at: i64
+}
+
+async fn query_issues_for_label<'conn>(conn: &'conn mut Conn, repo_id: i64, label: &str) -> impl Stream<Item=sqlx::Result<Issue>> + 'conn {
+ sqlx::query_as::<_, Issue>(r#"
+ SELECT issues.number, state, title, body, user_login, html_url, updated_at FROM issues
+ INNER JOIN is_labeled ON is_labeled.issue=issues.number
+ WHERE is_labeled.label=(SELECT id FROM labels WHERE repo=? AND name=?)
+ ORDER BY issues.number DESC
+ "#).bind(repo_id).bind(label)
+ .fetch(conn)
+}
+
+fn issue_to_entry(issue: Issue) -> Entry {
+ EntryBuilder::default()
+ .title(issue.title)
+ .id(issue.html_url.clone())
+ .links(vec![LinkBuilder::default()
+ .href(issue.html_url)
+ .build()
+ .expect("Failed to build link")])
+ .content(ContentBuilder::default()
+ .content_type(Some(String::from("html")))
+ .value(issue.body)
+ .build()
+ .expect("Failed to build content"))
+ .build()
+ .expect("Failed to build entry")
+}
+
+pub async fn generate(mut conn: &mut Conn, (ref owner, ref name): (String, String), out_path: PathBuf, labels: Vec<String>) -> Result<()> {
+ let labels = if labels.is_empty() {
+ sqlx::query_as::<_, (String,)>(
+ "SELECT name FROM labels WHERE repo=(SELECT id FROM repositories WHERE owner=? AND name=?)"
+ ).bind(owner).bind(name)
+ .fetch(&mut *conn)
+ .filter_map(|row| async { match row {
+ Ok((label,)) => Some(label),
+ _ => None
+ } })
+ .collect()
+ .await
+ } else { labels };
+
+ let repo_id = repo_id(&mut conn, owner, name).await?;
+
+ for label in labels {
+ info!("atom for {:?}", label);
+
+ let mut feed = FeedBuilder::default();
+ feed.title(label.clone());
+
+ let issues = query_issues_for_label(&mut conn, repo_id, &label).await;
+ feed.entries(
+ issues.filter_map(|issue| async { issue.ok() })
+ .map(issue_to_entry)
+ .collect::<Vec<_>>()
+ .await
+ );
+
+ let feed = feed.build().expect("Failed to build feed");
+
+ let feed_directory = out_path.join(label);
+ fs::create_dir_all(&feed_directory)?;
+
+ let feed_path = feed_directory.join("atom.xml");
+ let mut out_file = File::create(feed_path)?;
+ feed.write_to(&mut out_file)?;
+ }
+
+ Ok(())
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..196e587
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,138 @@
+use std::{ env, io, path::PathBuf };
+use structopt::StructOpt;
+use sqlx::SqlitePool;
+use tracing::info;
+use tracing_subscriber::{
+ fmt, filter,
+ layer::SubscriberExt,
+ util::SubscriberInitExt
+};
+
+use anyhow::{ anyhow, Result, Context };
+
+pub mod query;
+pub mod atom;
+
+#[derive(StructOpt)]
+#[structopt(name = "github-label-feed")]
+struct Opt {
+ #[structopt(subcommand)]
+ mode: OptMode,
+}
+
+#[derive(StructOpt)]
+enum OptMode {
+ List,
+ Sync {
+ repo: String,
+ #[structopt(long = "github-api-token", env = "GITHUB_TOKEN", hide_env_values = true)]
+ github_api_token: String
+ },
+ Atom {
+ repo: String,
+ out_path: PathBuf,
+ labels: Vec<String>
+ }
+}
+
+
+pub type Conn = sqlx::SqliteConnection;
+
+async fn init_db(conn: &mut Conn) {
+ // Naive init, all data is re-fetch-able, so no support for migrations
+ sqlx::query(r#"
+ PRAGMA foreign_keys = ON;
+ PRAGMA synchronous = OFF;
+
+ CREATE TABLE IF NOT EXISTS repositories(
+ id integer PRIMARY KEY,
+ owner text, name text,
+ UNIQUE (owner, name)
+ );
+
+ CREATE TABLE IF NOT EXISTS issues(
+ repo integer REFERENCES repositories,
+ number integer,
+ state integer, title text, body text,
+ user_login text,
+ html_url text,
+ updated_at integer,
+ PRIMARY KEY (repo, number)
+ );
+
+ CREATE TABLE IF NOT EXISTS labels(
+ id integer PRIMARY KEY,
+ repo integer REFERENCES repositories,
+ name text,
+ UNIQUE (repo, name)
+ );
+
+ CREATE TABLE IF NOT EXISTS is_labeled(
+ repo integer, issue integer,
+ label integer RFERENCES labels,
+ PRIMARY KEY (issue, label),
+ FOREIGN KEY (repo, issue) REFERENCES issues
+ );
+ "#).execute(conn)
+ .await
+ .expect("Failed to init database");
+}
+
+pub fn parse_repo(combined: &str) -> Result<(String, String)> {
+ let mut parts = combined
+ .split('/')
+ .map(str::trim)
+ .map(str::to_owned);
+
+ match (parts.next(), parts.next()) {
+ (Some(r), Some(n)) => Ok((r, n)),
+ _ => Err(anyhow!("invalid repo format, expected owner/name: '{}'", combined))
+ }
+}
+
+fn main() -> Result<()> {
+ let env_spec = env::var("RUST_LOG")
+ .unwrap_or_else(|_| String::from("info"));
+ tracing_subscriber::registry()
+ .with(fmt::layer()
+ .without_time()
+ .with_writer(io::stderr))
+ .with(filter::EnvFilter::new(env_spec))
+ .init();
+
+ let opt = Opt::from_args();
+
+ smol::run(async {
+ let pool = SqlitePool::new("sqlite:./issues.sqlite").await?;
+ init_db(&mut *pool.acquire().await?).await;
+
+ match opt.mode {
+ OptMode::List => {
+ let repos = query::list_repositories(&mut *pool.acquire().await?).await?;
+ for query::RepositoryInfo { owner, name, label_count, issue_count, .. } in repos {
+ println!("{}/{} ({} labels, {} issues)", owner, name, label_count, issue_count);
+ }
+ Ok(())
+ },
+ OptMode::Sync { repo, github_api_token } => {
+ info!("sync");
+ let repo = parse_repo(&repo)?;
+ let mut tx = pool.begin().await?;
+ query::labels::update(&mut tx, &github_api_token, repo.clone())
+ .await
+ .context("Failed to update labels")?;
+ query::issues::update(&mut tx, &github_api_token, repo)
+ .await
+ .context("Failed to update issues")?;
+ tx.commit().await?;
+ Ok(())
+ },
+ OptMode::Atom { repo, out_path, labels } => {
+ let repo = parse_repo(&repo)?;
+ atom::generate(&mut *pool.acquire().await?, repo, out_path, labels).await
+ .context("Failed to generate Atom feed")?;
+ Ok(())
+ }
+ }
+ })
+}
diff --git a/src/query/issues.rs b/src/query/issues.rs
new file mode 100644
index 0000000..c829a22
--- /dev/null
+++ b/src/query/issues.rs
@@ -0,0 +1,118 @@
+#![allow(proc_macro_derive_resolution_fallback)]
+
+use graphql_client::{ GraphQLQuery, Response };
+use reqwest::Client;
+
+use chrono::{ Utc, TimeZone };
+use tracing::{ error, info, debug };
+
+use crate::{ Conn, query::* };
+
+type URI = String;
+type HTML = String;
+type DateTime = String;
+
+#[derive(GraphQLQuery)]
+#[graphql(
+ // curl https://api.github.com/graphql -H 'Authorization: bearer ...'
+ schema_path = "graphql/github.json",
+ query_path = "graphql/issues.graphql",
+ response_derives = "Debug"
+)]
+pub struct IssuesQuery;
+
+fn state_to_integer(state: issues_query::IssueState) -> i64 {
+ use issues_query::IssueState::*;
+ match state {
+ OPEN => 0,
+ CLOSED => 1,
+ Other(_) => 2
+ }
+}
+
+pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> {
+ let repo = repo_id(conn, owner, name).await?;
+
+ let last_updated = last_updated(conn, repo)
+ .await?
+ .map(|t| Utc.timestamp(t, 0).to_rfc3339());
+ info!("updating repo {}/{} ({}), last update from {:?}", owner, name, repo, last_updated);
+
+ let client = Client::new();
+
+ let mut has_next_page = true;
+ let mut last_cursor = None;
+ while has_next_page {
+ eprint!(".");
+ let query = IssuesQuery::build_query(issues_query::Variables {
+ owner: owner.to_owned(),
+ name: name.to_owned(),
+ since: last_updated.clone(),
+ after: last_cursor.clone()
+ });
+
+ let res = graphql::query(&client, github_api_token, query).await?;
+ let response: Response<issues_query::ResponseData> = res.json().await?;
+
+ for error in response.errors.unwrap_or_default() {
+ error!("{:?}", error);
+ }
+
+ let repository = response.data
+ .expect("Missing response data")
+ .repository
+ .expect("Missing repository");
+
+ has_next_page = repository.issues.page_info.has_next_page;
+ debug!("has_next_page: {}", has_next_page);
+ let issues = repository.issues.edges.unwrap_or_default();
+
+ for issue in issues.into_iter().flatten() {
+ last_cursor = Some(issue.cursor);
+ if let Some(issue) = issue.node {
+ debug!("#{}: {}", issue.number, issue.title);
+ let ts = chrono::DateTime::parse_from_rfc3339(&issue.updated_at)
+ .expect("failed to parse datetime")
+ .timestamp();
+ let author = issue.author
+ .map(|author| author.login)
+ .unwrap_or_else(|| String::from("ghost"));
+
+ sqlx::query(
+ "REPLACE INTO issues (repo, number, state, title, body, user_login, html_url, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
+ ).bind(repo).bind(issue.number)
+ .bind(state_to_integer(issue.state)).bind(issue.title).bind(issue.body_html)
+ .bind(author).bind(issue.url).bind(ts)
+ .execute(&mut conn)
+ .await?;
+
+ sqlx::query(
+ "DELETE FROM is_labeled WHERE repo=? AND issue=?"
+ ).bind(repo).bind(issue.number)
+ .execute(&mut conn)
+ .await?;
+
+ let labels = issue.labels
+ .map(|l| l.edges)
+ .unwrap_or_default()
+ .unwrap_or_default()
+ .into_iter()
+ .flatten()
+ .map(|l| l.node)
+ .flatten();
+
+ for label in labels {
+ debug!("label: {}", label.name);
+ sqlx::query(
+ "INSERT INTO is_labeled (repo, issue, label) VALUES (?, ?, (SELECT id FROM labels WHERE name=?))"
+ ).bind(repo).bind(issue.number).bind(label.name)
+ .execute(&mut conn)
+ .await?;
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/query/labels.rs b/src/query/labels.rs
new file mode 100644
index 0000000..09774c6
--- /dev/null
+++ b/src/query/labels.rs
@@ -0,0 +1,65 @@
+use graphql_client::{ GraphQLQuery, Response };
+use reqwest::Client;
+
+use tracing::{ error, debug };
+
+use crate::{ Conn, query::* };
+
+type URI = String;
+
+#[derive(GraphQLQuery)]
+#[graphql(
+ // curl https://api.github.com/graphql -H 'Authorization: bearer ...'
+ schema_path = "graphql/github.json",
+ query_path = "graphql/labels.graphql",
+ response_derives = "Debug"
+)]
+pub struct RepoLabels;
+
+pub async fn update(mut conn: &mut Conn, github_api_token: &str, (ref owner, ref name): (String, String)) -> anyhow::Result<()> {
+ let repo = repo_id(&mut conn, owner, name).await?;
+
+ let client = Client::new();
+
+ let mut has_next_page = true;
+ let mut last_cursor = None;
+ while has_next_page {
+ let query = RepoLabels::build_query(repo_labels::Variables {
+ owner: owner.to_owned(),
+ name: name.to_owned(),
+ after: last_cursor.clone()
+ });
+
+ let res = graphql::query(&client, github_api_token, query).await?;
+ let response: Response<repo_labels::ResponseData> = res.json().await?;
+
+ for error in response.errors.unwrap_or_default() {
+ error!("{:?}", error);
+ }
+
+ let repository = response.data
+ .expect("Missing response data")
+ .repository
+ .expect("Missing repository");
+
+ if repository.labels.is_none() { break }
+ let labels = repository.labels.unwrap();
+ has_next_page = labels.page_info.has_next_page;
+ debug!("has_next_page: {}", has_next_page);
+ let labels = labels.edges.unwrap_or_default();
+
+ for label in labels.into_iter().flatten() {
+ last_cursor = Some(label.cursor);
+ if let Some(label) = label.node {
+ debug!("{}: {}", repo, label.name);
+ sqlx::query(
+ "INSERT OR IGNORE INTO labels (repo, name) VALUES (?, ?)"
+ ).bind(repo).bind(label.name)
+ .execute(&mut conn)
+ .await?;
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/query/mod.rs b/src/query/mod.rs
new file mode 100644
index 0000000..4e472c5
--- /dev/null
+++ b/src/query/mod.rs
@@ -0,0 +1,87 @@
+use sqlx::prelude::*;
+use anyhow::{ Result, Context };
+
+use crate::Conn;
+
+pub mod issues;
+pub mod labels;
+
+#[derive(sqlx::FromRow, sqlx::Type)]
+pub struct RepositoryInfo {
+ pub owner: String,
+ pub name: String,
+ pub label_count: i64,
+ pub issue_count: i64
+}
+
+pub async fn repo_id(conn: &mut Conn, owner: &str, name: &str) -> Result<i64> {
+ sqlx::query_as::<_, (i64,)>(
+ "INSERT OR IGNORE INTO repositories (owner, name) VALUES (?, ?);
+ SELECT id FROM repositories WHERE owner = ? AND name = ?"
+ ).bind(owner).bind(name)
+ .bind(owner).bind(name)
+ .fetch_one(conn)
+ .await
+ .map(|(id,)| id)
+ .with_context(|| format!("Couldn't find repo '{}/{}' in database", owner, name))
+}
+
+async fn last_updated(conn: &mut Conn, repo: i64) -> Result<Option<i64>> {
+ sqlx::query_as::<_, (i64,)>(
+ "SELECT MAX(updated_at) FROM issues WHERE repo = ?",
+ ).bind(repo)
+ .fetch_optional(conn)
+ .await
+ .map(|opt| opt.map(|row| row.0))
+ .with_context(|| format!("Couldn't find time of last update for repo id {}", repo))
+}
+
+pub async fn list_repositories(db: &mut Conn) -> sqlx::Result<Vec<RepositoryInfo>> {
+ sqlx::query_as(
+ "SELECT repositories.owner, repositories.name,
+ (SELECT count(id) FROM labels WHERE repo = repositories.id) AS label_count,
+ (SELECT count(number) FROM issues WHERE repo = repositories.id) AS issue_count
+ FROM repositories"
+ ).fetch_all(db)
+ .await
+}
+
+pub mod graphql {
+ use std::time::Duration;
+ use reqwest::header;
+ use serde::Serialize;
+ use futures_retry::{ ErrorHandler, RetryPolicy, FutureRetry };
+ use graphql_client::QueryBody;
+
+ static API_ENDPOINT: &str = "https://api.github.com/graphql";
+ static USER_AGENT: &str = "github.com/tilpner/github-label-feed";
+
+ static RETRY_DELAY: &[u64] = &[ 5, 50, 250, 1000, 5000, 25000 ];
+
+ pub struct RetryStrategy;
+ impl ErrorHandler<reqwest::Error> for RetryStrategy {
+ type OutError = reqwest::Error;
+
+ fn handle(&mut self, attempt: usize, e: reqwest::Error) -> RetryPolicy<Self::OutError> {
+ match RETRY_DELAY.get(attempt) {
+ Some(&ms) => RetryPolicy::WaitRetry(Duration::from_millis(ms)),
+ None => RetryPolicy::ForwardError(e)
+ }
+ }
+ }
+
+ pub async fn query(client: &reqwest::Client, api_token: &str, query: QueryBody<impl Serialize>) -> reqwest::Result<reqwest::Response> {
+ FutureRetry::new(|| {
+ client
+ .post(API_ENDPOINT)
+ .timeout(Duration::from_secs(60))
+ .header(header::USER_AGENT, USER_AGENT)
+ .bearer_auth(api_token)
+ .json(&query)
+ .send()
+ }, RetryStrategy)
+ .await
+ .map(|(res, _)| res)
+ .map_err(|(e, _)| e)
+ }
+}