[docs]defcreate_alembic_config(url:Union[str,URL]):"""Create our alembic configuration object, to run alembic commands."""fromalembic.configimportConfigasAlembicConfigurl=make_url(url)alembic_cfg=AlembicConfig()alembic_cfg.set_main_option("script_location",os.path.join(sqlalchemy_storage.__path__[0],"migrations"))alembic_cfg.set_main_option("file_template","%%(year)d%%(month).2d%%(day).2d%%(hour).2d%%(minute).2d%%(second).2d_%%(rev)s_%%(slug)s")alembic_cfg.set_main_option("truncate_slug_length","40")alembic_cfg.set_main_option("output_encoding","utf-8")alembic_cfg.set_main_option("sqlalchemy.url",url.render_as_string(hide_password=False))alembic_cfg.set_main_option("configure_logger","false")returnalembic_cfg
[docs]defcreate_harp_config_with_sqlalchemy_storage_from_command_line_options(kwargs):"""Create a Harp configuration object using common server command line options (--set...) with the sqlalchemy storage application installed so that the storage is properly configured."""fromharpimportConfigasHarpConfigoptions=CommonServerOptions(**kwargs)cfg=HarpConfig()cfg.add_application("sqlalchemy_storage")cfg.read_env(options)cfg.validate(allow_extraneous_settings=True)returncfg
[docs]asyncdefdo_reset(engine):logger.info("🛢 [db:reset] dropping all tables.")asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)awaitconn.execute(text("DROP TABLE IF EXISTS alembic_version;"))
[docs]asyncdefdo_migrate(engine,*,migrator,reset=False):logger.info(f"🛢 Starting database migrations... (dialect={engine.dialect.name}, reset={reset}).")ifreset:awaitdo_reset(engine)ifengine.dialect.name=="sqlite":logger.debug("🛢 [db:migrate dialect=sqlite] creating all tables (without alembic).")asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)elifmigrator:# alembic manages migrations except for sqlite, because it's not trivial to make them work and an env using# sqlite does not really need to support upgrades (drop/recreate is fine when harp is upgraded).logger.debug("🛢 [db:migrate] Running alembic migrations...")withThreadPoolExecutor()asexecutor:awaitasyncio.get_event_loop().run_in_executor(executor,migrator)ifengine.dialect.name=="mysql":logger.debug("🛢 [db:migrate dialect=mysql] creating fulltext indexes.")try:asyncwithengine.begin()asconn:awaitconn.execute(text(f"CREATE FULLTEXT INDEX endpoint_ft_index ON {Transaction.__tablename__} (endpoint);"))# Create the full text index for messages.summaryawaitconn.execute(text(f"CREATE FULLTEXT INDEX summary_ft_index ON {Message.__tablename__} (summary);"))awaitconn.commit()exceptOperationalErrorase:# check for duplicate key errorife.origande.orig.args[0]==1061:passelse:raiseelogger.debug("🛢 [db:migrate] Done.")